agenterra_rmcp/
service.rs

1use futures::{FutureExt, future::BoxFuture};
2use thiserror::Error;
3
4use crate::{
5    error::Error as McpError,
6    model::{
7        CancelledNotification, CancelledNotificationParam, Extensions, GetExtensions, GetMeta,
8        JsonRpcBatchRequestItem, JsonRpcBatchResponseItem, JsonRpcError, JsonRpcMessage,
9        JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Meta, NumberOrString, ProgressToken,
10        RequestId, ServerJsonRpcMessage,
11    },
12    transport::{IntoTransport, Transport},
13};
14#[cfg(feature = "client")]
15#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
16mod client;
17#[cfg(feature = "client")]
18#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
19pub use client::*;
20#[cfg(feature = "server")]
21#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
22mod server;
23#[cfg(feature = "server")]
24#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
25pub use server::*;
26#[cfg(feature = "tower")]
27#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
28mod tower;
29use tokio_util::sync::{CancellationToken, DropGuard};
30#[cfg(feature = "tower")]
31#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
32pub use tower::*;
33use tracing::instrument;
34#[derive(Error, Debug)]
35#[non_exhaustive]
36pub enum ServiceError {
37    #[error("Mcp error: {0}")]
38    McpError(McpError),
39    #[error("Transport send error: {0}")]
40    TransportSend(Box<dyn std::error::Error + Send + Sync>),
41    #[error("Transport closed")]
42    TransportClosed,
43    #[error("Unexpected response type")]
44    UnexpectedResponse,
45    #[error("task cancelled for reason {}", reason.as_deref().unwrap_or("<unknown>"))]
46    Cancelled { reason: Option<String> },
47    #[error("request timeout after {}", chrono::Duration::from_std(*timeout).unwrap_or_default())]
48    Timeout { timeout: Duration },
49}
50
51impl ServiceError {}
52trait TransferObject:
53    std::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
54{
55}
56
57impl<T> TransferObject for T where
58    T: std::fmt::Debug
59        + serde::Serialize
60        + serde::de::DeserializeOwned
61        + Send
62        + Sync
63        + 'static
64        + Clone
65{
66}
67
68#[allow(private_bounds, reason = "there's no the third implementation")]
69pub trait ServiceRole: std::fmt::Debug + Send + Sync + 'static + Copy + Clone {
70    type Req: TransferObject + GetMeta + GetExtensions;
71    type Resp: TransferObject;
72    type Not: TryInto<CancelledNotification, Error = Self::Not>
73        + From<CancelledNotification>
74        + TransferObject;
75    type PeerReq: TransferObject + GetMeta + GetExtensions;
76    type PeerResp: TransferObject;
77    type PeerNot: TryInto<CancelledNotification, Error = Self::PeerNot>
78        + From<CancelledNotification>
79        + TransferObject
80        + GetMeta
81        + GetExtensions;
82    type InitializeError<E>;
83    const IS_CLIENT: bool;
84    type Info: TransferObject;
85    type PeerInfo: TransferObject;
86}
87
88pub type TxJsonRpcMessage<R> =
89    JsonRpcMessage<<R as ServiceRole>::Req, <R as ServiceRole>::Resp, <R as ServiceRole>::Not>;
90pub type RxJsonRpcMessage<R> = JsonRpcMessage<
91    <R as ServiceRole>::PeerReq,
92    <R as ServiceRole>::PeerResp,
93    <R as ServiceRole>::PeerNot,
94>;
95
96pub trait Service<R: ServiceRole>: Send + Sync + 'static {
97    fn handle_request(
98        &self,
99        request: R::PeerReq,
100        context: RequestContext<R>,
101    ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_;
102    fn handle_notification(
103        &self,
104        notification: R::PeerNot,
105        context: NotificationContext<R>,
106    ) -> impl Future<Output = Result<(), McpError>> + Send + '_;
107    fn get_info(&self) -> R::Info;
108}
109
110pub trait ServiceExt<R: ServiceRole>: Service<R> + Sized {
111    /// Convert this service to a dynamic boxed service
112    ///
113    /// This could be very helpful when you want to store the services in a collection
114    fn into_dyn(self) -> Box<dyn DynService<R>> {
115        Box::new(self)
116    }
117    fn serve<T, E, A>(
118        self,
119        transport: T,
120    ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError<E>>> + Send
121    where
122        T: IntoTransport<R, E, A>,
123        E: std::error::Error + From<std::io::Error> + Send + Sync + 'static,
124        Self: Sized,
125    {
126        Self::serve_with_ct(self, transport, Default::default())
127    }
128    fn serve_with_ct<T, E, A>(
129        self,
130        transport: T,
131        ct: CancellationToken,
132    ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError<E>>> + Send
133    where
134        T: IntoTransport<R, E, A>,
135        E: std::error::Error + From<std::io::Error> + Send + Sync + 'static,
136        Self: Sized;
137}
138
139impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
140    fn handle_request(
141        &self,
142        request: R::PeerReq,
143        context: RequestContext<R>,
144    ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_ {
145        DynService::handle_request(self.as_ref(), request, context)
146    }
147
148    fn handle_notification(
149        &self,
150        notification: R::PeerNot,
151        context: NotificationContext<R>,
152    ) -> impl Future<Output = Result<(), McpError>> + Send + '_ {
153        DynService::handle_notification(self.as_ref(), notification, context)
154    }
155
156    fn get_info(&self) -> R::Info {
157        DynService::get_info(self.as_ref())
158    }
159}
160
161pub trait DynService<R: ServiceRole>: Send + Sync {
162    fn handle_request(
163        &self,
164        request: R::PeerReq,
165        context: RequestContext<R>,
166    ) -> BoxFuture<Result<R::Resp, McpError>>;
167    fn handle_notification(
168        &self,
169        notification: R::PeerNot,
170        context: NotificationContext<R>,
171    ) -> BoxFuture<Result<(), McpError>>;
172    fn get_info(&self) -> R::Info;
173}
174
175impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
176    fn handle_request(
177        &self,
178        request: R::PeerReq,
179        context: RequestContext<R>,
180    ) -> BoxFuture<Result<R::Resp, McpError>> {
181        Box::pin(self.handle_request(request, context))
182    }
183    fn handle_notification(
184        &self,
185        notification: R::PeerNot,
186        context: NotificationContext<R>,
187    ) -> BoxFuture<Result<(), McpError>> {
188        Box::pin(self.handle_notification(notification, context))
189    }
190    fn get_info(&self) -> R::Info {
191        self.get_info()
192    }
193}
194
195use std::{
196    collections::{HashMap, VecDeque},
197    ops::Deref,
198    sync::{Arc, atomic::AtomicU32},
199    time::Duration,
200};
201
202use tokio::sync::mpsc;
203
204pub trait RequestIdProvider: Send + Sync + 'static {
205    fn next_request_id(&self) -> RequestId;
206}
207
208pub trait ProgressTokenProvider: Send + Sync + 'static {
209    fn next_progress_token(&self) -> ProgressToken;
210}
211
212pub type AtomicU32RequestIdProvider = AtomicU32Provider;
213pub type AtomicU32ProgressTokenProvider = AtomicU32Provider;
214
215#[derive(Debug, Default)]
216pub struct AtomicU32Provider {
217    id: AtomicU32,
218}
219
220impl RequestIdProvider for AtomicU32Provider {
221    fn next_request_id(&self) -> RequestId {
222        RequestId::Number(self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
223    }
224}
225
226impl ProgressTokenProvider for AtomicU32Provider {
227    fn next_progress_token(&self) -> ProgressToken {
228        ProgressToken(NumberOrString::Number(
229            self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
230        ))
231    }
232}
233
234type Responder<T> = tokio::sync::oneshot::Sender<T>;
235
236/// A handle to a remote request
237///
238/// You can cancel it by call [`RequestHandle::cancel`] with a reason,
239///
240/// or wait for response by call [`RequestHandle::await_response`]
241#[derive(Debug)]
242pub struct RequestHandle<R: ServiceRole> {
243    pub rx: tokio::sync::oneshot::Receiver<Result<R::PeerResp, ServiceError>>,
244    pub options: PeerRequestOptions,
245    pub peer: Peer<R>,
246    pub id: RequestId,
247    pub progress_token: ProgressToken,
248}
249
250impl<R: ServiceRole> RequestHandle<R> {
251    pub const REQUEST_TIMEOUT_REASON: &str = "request timeout";
252    pub async fn await_response(self) -> Result<R::PeerResp, ServiceError> {
253        if let Some(timeout) = self.options.timeout {
254            let timeout_result = tokio::time::timeout(timeout, async move {
255                self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
256            })
257            .await;
258            match timeout_result {
259                Ok(response) => response,
260                Err(_) => {
261                    let error = Err(ServiceError::Timeout { timeout });
262                    // cancel this request
263                    let notification = CancelledNotification {
264                        params: CancelledNotificationParam {
265                            request_id: self.id,
266                            reason: Some(Self::REQUEST_TIMEOUT_REASON.to_owned()),
267                        },
268                        method: crate::model::CancelledNotificationMethod,
269                        extensions: Default::default(),
270                    };
271                    let _ = self.peer.send_notification(notification.into()).await;
272                    error
273                }
274            }
275        } else {
276            self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
277        }
278    }
279
280    /// Cancel this request
281    pub async fn cancel(self, reason: Option<String>) -> Result<(), ServiceError> {
282        let notification = CancelledNotification {
283            params: CancelledNotificationParam {
284                request_id: self.id,
285                reason,
286            },
287            method: crate::model::CancelledNotificationMethod,
288            extensions: Default::default(),
289        };
290        self.peer.send_notification(notification.into()).await?;
291        Ok(())
292    }
293}
294
295#[derive(Debug)]
296pub(crate) enum PeerSinkMessage<R: ServiceRole> {
297    Request {
298        request: R::Req,
299        id: RequestId,
300        responder: Responder<Result<R::PeerResp, ServiceError>>,
301    },
302    Notification {
303        notification: R::Not,
304        responder: Responder<Result<(), ServiceError>>,
305    },
306}
307
308/// An interface to fetch the remote client or server
309///
310/// For general purpose, call [`Peer::send_request`] or [`Peer::send_notification`] to send message to remote peer.
311///
312/// To create a cancellable request, call [`Peer::send_request_with_option`].
313#[derive(Clone)]
314pub struct Peer<R: ServiceRole> {
315    tx: mpsc::Sender<PeerSinkMessage<R>>,
316    request_id_provider: Arc<dyn RequestIdProvider>,
317    progress_token_provider: Arc<dyn ProgressTokenProvider>,
318    info: Arc<tokio::sync::OnceCell<R::PeerInfo>>,
319}
320
321impl<R: ServiceRole> std::fmt::Debug for Peer<R> {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        f.debug_struct("PeerSink")
324            .field("tx", &self.tx)
325            .field("is_client", &R::IS_CLIENT)
326            .finish()
327    }
328}
329
330type ProxyOutbound<R> = mpsc::Receiver<PeerSinkMessage<R>>;
331
332#[derive(Debug, Default)]
333pub struct PeerRequestOptions {
334    pub timeout: Option<Duration>,
335    pub meta: Option<Meta>,
336}
337
338impl PeerRequestOptions {
339    pub fn no_options() -> Self {
340        Self::default()
341    }
342}
343
344impl<R: ServiceRole> Peer<R> {
345    const CLIENT_CHANNEL_BUFFER_SIZE: usize = 1024;
346    pub(crate) fn new(
347        request_id_provider: Arc<dyn RequestIdProvider>,
348        peer_info: Option<R::PeerInfo>,
349    ) -> (Peer<R>, ProxyOutbound<R>) {
350        let (tx, rx) = mpsc::channel(Self::CLIENT_CHANNEL_BUFFER_SIZE);
351        (
352            Self {
353                tx,
354                request_id_provider,
355                progress_token_provider: Arc::new(AtomicU32ProgressTokenProvider::default()),
356                info: Arc::new(tokio::sync::OnceCell::new_with(peer_info)),
357            },
358            rx,
359        )
360    }
361    pub async fn send_notification(&self, notification: R::Not) -> Result<(), ServiceError> {
362        let (responder, receiver) = tokio::sync::oneshot::channel();
363        self.tx
364            .send(PeerSinkMessage::Notification {
365                notification,
366                responder,
367            })
368            .await
369            .map_err(|_m| ServiceError::TransportClosed)?;
370        receiver.await.map_err(|_e| ServiceError::TransportClosed)?
371    }
372    pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
373        self.send_request_with_option(request, PeerRequestOptions::no_options())
374            .await?
375            .await_response()
376            .await
377    }
378
379    pub async fn send_cancellable_request(
380        &self,
381        request: R::Req,
382        options: PeerRequestOptions,
383    ) -> Result<RequestHandle<R>, ServiceError> {
384        self.send_request_with_option(request, options).await
385    }
386
387    pub async fn send_request_with_option(
388        &self,
389        mut request: R::Req,
390        options: PeerRequestOptions,
391    ) -> Result<RequestHandle<R>, ServiceError> {
392        let id = self.request_id_provider.next_request_id();
393        let progress_token = self.progress_token_provider.next_progress_token();
394        request
395            .get_meta_mut()
396            .set_progress_token(progress_token.clone());
397        if let Some(meta) = options.meta.clone() {
398            request.get_meta_mut().extend(meta);
399        }
400        let (responder, receiver) = tokio::sync::oneshot::channel();
401        self.tx
402            .send(PeerSinkMessage::Request {
403                request,
404                id: id.clone(),
405                responder,
406            })
407            .await
408            .map_err(|_m| ServiceError::TransportClosed)?;
409        Ok(RequestHandle {
410            id,
411            rx: receiver,
412            progress_token,
413            options,
414            peer: self.clone(),
415        })
416    }
417    pub fn peer_info(&self) -> Option<&R::PeerInfo> {
418        self.info.get()
419    }
420
421    pub fn set_peer_info(&self, info: R::PeerInfo) {
422        if self.info.initialized() {
423            tracing::warn!("trying to set peer info, which is already initialized");
424        } else {
425            let _ = self.info.set(info);
426        }
427    }
428
429    pub fn is_transport_closed(&self) -> bool {
430        self.tx.is_closed()
431    }
432}
433
434#[derive(Debug)]
435pub struct RunningService<R: ServiceRole, S: Service<R>> {
436    service: Arc<S>,
437    peer: Peer<R>,
438    handle: tokio::task::JoinHandle<QuitReason>,
439    cancellation_token: CancellationToken,
440    dg: DropGuard,
441}
442impl<R: ServiceRole, S: Service<R>> Deref for RunningService<R, S> {
443    type Target = Peer<R>;
444
445    fn deref(&self) -> &Self::Target {
446        &self.peer
447    }
448}
449
450impl<R: ServiceRole, S: Service<R>> RunningService<R, S> {
451    #[inline]
452    pub fn peer(&self) -> &Peer<R> {
453        &self.peer
454    }
455    #[inline]
456    pub fn service(&self) -> &S {
457        self.service.as_ref()
458    }
459    #[inline]
460    pub fn cancellation_token(&self) -> RunningServiceCancellationToken {
461        RunningServiceCancellationToken(self.cancellation_token.clone())
462    }
463    #[inline]
464    pub async fn waiting(self) -> Result<QuitReason, tokio::task::JoinError> {
465        self.handle.await
466    }
467    pub async fn cancel(self) -> Result<QuitReason, tokio::task::JoinError> {
468        let RunningService { dg, handle, .. } = self;
469        dg.disarm().cancel();
470        handle.await
471    }
472}
473
474// use a wrapper type so we can tweak the implementation if needed
475pub struct RunningServiceCancellationToken(CancellationToken);
476
477impl RunningServiceCancellationToken {
478    pub fn cancel(self) {
479        self.0.cancel();
480    }
481}
482
483#[derive(Debug)]
484pub enum QuitReason {
485    Cancelled,
486    Closed,
487    JoinError(tokio::task::JoinError),
488}
489
490/// Request execution context
491#[derive(Debug, Clone)]
492pub struct RequestContext<R: ServiceRole> {
493    /// this token will be cancelled when the [`CancelledNotification`] is received.
494    pub ct: CancellationToken,
495    pub id: RequestId,
496    pub meta: Meta,
497    pub extensions: Extensions,
498    /// An interface to fetch the remote client or server
499    pub peer: Peer<R>,
500}
501
502/// Request execution context
503#[derive(Debug, Clone)]
504pub struct NotificationContext<R: ServiceRole> {
505    pub meta: Meta,
506    pub extensions: Extensions,
507    /// An interface to fetch the remote client or server
508    pub peer: Peer<R>,
509}
510
511/// Use this function to skip initialization process
512pub fn serve_directly<R, S, T, E, A>(
513    service: S,
514    transport: T,
515    peer_info: Option<R::PeerInfo>,
516) -> RunningService<R, S>
517where
518    R: ServiceRole,
519    S: Service<R>,
520    T: IntoTransport<R, E, A>,
521    E: std::error::Error + Send + Sync + 'static,
522{
523    serve_directly_with_ct(service, transport, peer_info, Default::default())
524}
525
526/// Use this function to skip initialization process
527pub fn serve_directly_with_ct<R, S, T, E, A>(
528    service: S,
529    transport: T,
530    peer_info: Option<R::PeerInfo>,
531    ct: CancellationToken,
532) -> RunningService<R, S>
533where
534    R: ServiceRole,
535    S: Service<R>,
536    T: IntoTransport<R, E, A>,
537    E: std::error::Error + Send + Sync + 'static,
538{
539    let (peer, peer_rx) = Peer::new(Arc::new(AtomicU32RequestIdProvider::default()), peer_info);
540    serve_inner(service, transport, peer, peer_rx, ct)
541}
542
543#[instrument(skip_all)]
544fn serve_inner<R, S, T, E, A>(
545    service: S,
546    transport: T,
547    peer: Peer<R>,
548    mut peer_rx: tokio::sync::mpsc::Receiver<PeerSinkMessage<R>>,
549    ct: CancellationToken,
550) -> RunningService<R, S>
551where
552    R: ServiceRole,
553    S: Service<R>,
554    T: IntoTransport<R, E, A>,
555    E: std::error::Error + Send + Sync + 'static,
556{
557    const SINK_PROXY_BUFFER_SIZE: usize = 64;
558    let (sink_proxy_tx, mut sink_proxy_rx) =
559        tokio::sync::mpsc::channel::<TxJsonRpcMessage<R>>(SINK_PROXY_BUFFER_SIZE);
560    let peer_info = peer.peer_info();
561    if R::IS_CLIENT {
562        tracing::info!(?peer_info, "Service initialized as client");
563    } else {
564        tracing::info!(?peer_info, "Service initialized as server");
565    }
566
567    let mut local_responder_pool =
568        HashMap::<RequestId, Responder<Result<R::PeerResp, ServiceError>>>::new();
569    let mut local_ct_pool = HashMap::<RequestId, CancellationToken>::new();
570    let shared_service = Arc::new(service);
571    // for return
572    let service = shared_service.clone();
573
574    // let message_sink = tokio::sync::
575    // let mut stream = std::pin::pin!(stream);
576    let serve_loop_ct = ct.child_token();
577    let peer_return: Peer<R> = peer.clone();
578    let handle = tokio::spawn(async move {
579        let mut transport = transport.into_transport();
580        let mut batch_messages = VecDeque::<RxJsonRpcMessage<R>>::new();
581        let mut send_task_set = tokio::task::JoinSet::<SendTaskResult<E>>::new();
582        #[derive(Debug)]
583        enum SendTaskResult<E> {
584            Request {
585                id: RequestId,
586                result: Result<(), E>,
587            },
588            Notification {
589                responder: Responder<Result<(), ServiceError>>,
590                cancellation_param: Option<CancelledNotificationParam>,
591                result: Result<(), E>,
592            },
593        }
594        #[derive(Debug)]
595        enum Event<R: ServiceRole, E> {
596            ProxyMessage(PeerSinkMessage<R>),
597            PeerMessage(RxJsonRpcMessage<R>),
598            ToSink(TxJsonRpcMessage<R>),
599            SendTaskResult(SendTaskResult<E>),
600        }
601
602        let quit_reason = loop {
603            let evt = if let Some(m) = batch_messages.pop_front() {
604                Event::PeerMessage(m)
605            } else {
606                tokio::select! {
607                    m = sink_proxy_rx.recv(), if !sink_proxy_rx.is_closed() => {
608                        if let Some(m) = m {
609                            Event::ToSink(m)
610                        } else {
611                            continue
612                        }
613                    }
614                    m = transport.receive() => {
615                        if let Some(m) = m {
616                            Event::PeerMessage(m)
617                        } else {
618                            // input stream closed
619                            tracing::info!("input stream terminated");
620                            break QuitReason::Closed
621                        }
622                    }
623                    m = peer_rx.recv(), if !peer_rx.is_closed() => {
624                        if let Some(m) = m {
625                            Event::ProxyMessage(m)
626                        } else {
627                            continue
628                        }
629                    }
630                    m = send_task_set.join_next(), if !send_task_set.is_empty() => {
631                        let Some(result) = m else {
632                            continue
633                        };
634                        match result {
635                            Err(e) => {
636                                // join error, which is serious, we should quit.
637                                tracing::error!(%e, "send request task encounter a tokio join error");
638                                break QuitReason::JoinError(e)
639                            }
640                            Ok(result) => {
641                                Event::SendTaskResult(result)
642                            }
643                        }
644                    }
645                    _ = serve_loop_ct.cancelled() => {
646                        tracing::info!("task cancelled");
647                        break QuitReason::Cancelled
648                    }
649                }
650            };
651
652            tracing::trace!(?evt, "new event");
653            match evt {
654                Event::SendTaskResult(SendTaskResult::Request { id, result }) => {
655                    if let Err(e) = result {
656                        if let Some(responder) = local_responder_pool.remove(&id) {
657                            let _ = responder.send(Err(ServiceError::TransportSend(Box::new(e))));
658                        }
659                    }
660                }
661                Event::SendTaskResult(SendTaskResult::Notification {
662                    responder,
663                    result,
664                    cancellation_param,
665                }) => {
666                    let response = if let Err(e) = result {
667                        Err(ServiceError::TransportSend(Box::new(e)))
668                    } else {
669                        Ok(())
670                    };
671                    let _ = responder.send(response);
672                    if let Some(param) = cancellation_param {
673                        if let Some(responder) = local_responder_pool.remove(&param.request_id) {
674                            tracing::info!(id = %param.request_id, reason = param.reason, "cancelled");
675                            let _response_result = responder.send(Err(ServiceError::Cancelled {
676                                reason: param.reason.clone(),
677                            }));
678                        }
679                    }
680                }
681                // response and error
682                Event::ToSink(m) => {
683                    if let Some(id) = match &m {
684                        JsonRpcMessage::Response(response) => Some(&response.id),
685                        JsonRpcMessage::Error(error) => Some(&error.id),
686                        _ => None,
687                    } {
688                        if let Some(ct) = local_ct_pool.remove(id) {
689                            ct.cancel();
690                        }
691                        let send = transport.send(m);
692                        tokio::spawn(async move {
693                            let send_result = send.await;
694                            if let Err(error) = send_result {
695                                tracing::error!(%error, "fail to response message");
696                            }
697                        });
698                    }
699                }
700                Event::ProxyMessage(PeerSinkMessage::Request {
701                    request,
702                    id,
703                    responder,
704                }) => {
705                    local_responder_pool.insert(id.clone(), responder);
706                    let send = transport.send(JsonRpcMessage::request(request, id.clone()));
707                    {
708                        let id = id.clone();
709                        send_task_set
710                            .spawn(send.map(move |r| SendTaskResult::Request { id, result: r }));
711                    }
712                }
713                Event::ProxyMessage(PeerSinkMessage::Notification {
714                    notification,
715                    responder,
716                }) => {
717                    // catch cancellation notification
718                    let mut cancellation_param = None;
719                    let notification = match notification.try_into() {
720                        Ok::<CancelledNotification, _>(cancelled) => {
721                            cancellation_param.replace(cancelled.params.clone());
722                            cancelled.into()
723                        }
724                        Err(notification) => notification,
725                    };
726                    let send = transport.send(JsonRpcMessage::notification(notification));
727                    send_task_set.spawn(send.map(move |result| SendTaskResult::Notification {
728                        responder,
729                        cancellation_param,
730                        result,
731                    }));
732                }
733                Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
734                    id,
735                    mut request,
736                    ..
737                })) => {
738                    tracing::debug!(%id, ?request, "received request");
739                    {
740                        let service = shared_service.clone();
741                        let sink = sink_proxy_tx.clone();
742                        let request_ct = serve_loop_ct.child_token();
743                        let context_ct = request_ct.child_token();
744                        local_ct_pool.insert(id.clone(), request_ct);
745                        let mut extensions = Extensions::new();
746                        let mut meta = Meta::new();
747                        // avoid clone
748                        std::mem::swap(&mut extensions, request.extensions_mut());
749                        std::mem::swap(&mut meta, request.get_meta_mut());
750                        let context = RequestContext {
751                            ct: context_ct,
752                            id: id.clone(),
753                            peer: peer.clone(),
754                            meta,
755                            extensions,
756                        };
757                        tokio::spawn(async move {
758                            let result = service.handle_request(request, context).await;
759                            let response = match result {
760                                Ok(result) => {
761                                    tracing::debug!(%id, ?result, "response message");
762                                    JsonRpcMessage::response(result, id)
763                                }
764                                Err(error) => {
765                                    tracing::warn!(%id, ?error, "response error");
766                                    JsonRpcMessage::error(error, id)
767                                }
768                            };
769                            let _send_result = sink.send(response).await;
770                        });
771                    }
772                }
773                Event::PeerMessage(JsonRpcMessage::Notification(JsonRpcNotification {
774                    notification,
775                    ..
776                })) => {
777                    tracing::info!(?notification, "received notification");
778                    // catch cancelled notification
779                    let mut notification = match notification.try_into() {
780                        Ok::<CancelledNotification, _>(cancelled) => {
781                            if let Some(ct) = local_ct_pool.remove(&cancelled.params.request_id) {
782                                tracing::info!(id = %cancelled.params.request_id, reason = cancelled.params.reason, "cancelled");
783                                ct.cancel();
784                            }
785                            cancelled.into()
786                        }
787                        Err(notification) => notification,
788                    };
789                    {
790                        let service = shared_service.clone();
791                        let mut extensions = Extensions::new();
792                        let mut meta = Meta::new();
793                        // avoid clone
794                        std::mem::swap(&mut extensions, notification.extensions_mut());
795                        std::mem::swap(&mut meta, notification.get_meta_mut());
796                        let context = NotificationContext {
797                            peer: peer.clone(),
798                            meta,
799                            extensions,
800                        };
801                        tokio::spawn(async move {
802                            let result = service.handle_notification(notification, context).await;
803                            if let Err(error) = result {
804                                tracing::warn!(%error, "Error sending notification");
805                            }
806                        });
807                    }
808                }
809                Event::PeerMessage(JsonRpcMessage::Response(JsonRpcResponse {
810                    result,
811                    id,
812                    ..
813                })) => {
814                    if let Some(responder) = local_responder_pool.remove(&id) {
815                        let response_result = responder.send(Ok(result));
816                        if let Err(_error) = response_result {
817                            tracing::warn!(%id, "Error sending response");
818                        }
819                    }
820                }
821                Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
822                    if let Some(responder) = local_responder_pool.remove(&id) {
823                        let _response_result = responder.send(Err(ServiceError::McpError(error)));
824                        if let Err(_error) = _response_result {
825                            tracing::warn!(%id, "Error sending response");
826                        }
827                    }
828                }
829                Event::PeerMessage(JsonRpcMessage::BatchRequest(batch)) => {
830                    batch_messages.extend(
831                        batch
832                            .into_iter()
833                            .map(JsonRpcBatchRequestItem::into_non_batch_message),
834                    );
835                }
836                Event::PeerMessage(JsonRpcMessage::BatchResponse(batch)) => {
837                    batch_messages.extend(
838                        batch
839                            .into_iter()
840                            .map(JsonRpcBatchResponseItem::into_non_batch_message),
841                    );
842                }
843            }
844        };
845        let sink_close_result = transport.close().await;
846        if let Err(e) = sink_close_result {
847            tracing::error!(%e, "fail to close sink");
848        }
849        tracing::info!(?quit_reason, "serve finished");
850        quit_reason
851    });
852    RunningService {
853        service,
854        peer: peer_return,
855        handle,
856        cancellation_token: ct.clone(),
857        dg: ct.drop_guard(),
858    }
859}