Skip to main content

rmcp/
service.rs

1use futures::{FutureExt, future::BoxFuture};
2use thiserror::Error;
3
4#[cfg(feature = "server")]
5use crate::model::ServerJsonRpcMessage;
6use crate::{
7    error::ErrorData as McpError,
8    model::{
9        CancelledNotification, CancelledNotificationParam, Extensions, GetExtensions, GetMeta,
10        JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Meta,
11        NumberOrString, ProgressToken, RequestId,
12    },
13    transport::{DynamicTransportError, IntoTransport, Transport},
14};
15#[cfg(feature = "client")]
16mod client;
17#[cfg(feature = "client")]
18pub use client::*;
19#[cfg(feature = "server")]
20mod server;
21#[cfg(feature = "server")]
22pub use server::*;
23#[cfg(feature = "tower")]
24mod tower;
25use tokio_util::sync::{CancellationToken, DropGuard};
26#[cfg(feature = "tower")]
27pub use tower::*;
28use tracing::{Instrument as _, instrument};
29#[derive(Error, Debug)]
30#[non_exhaustive]
31pub enum ServiceError {
32    #[error("Mcp error: {0}")]
33    McpError(McpError),
34    #[error("Transport send error: {0}")]
35    TransportSend(DynamicTransportError),
36    #[error("Transport closed")]
37    TransportClosed,
38    #[error("Unexpected response type")]
39    UnexpectedResponse,
40    #[error("task cancelled for reason {}", reason.as_deref().unwrap_or("<unknown>"))]
41    Cancelled { reason: Option<String> },
42    #[error("request timeout after {}", chrono::Duration::from_std(*timeout).unwrap_or_default())]
43    Timeout { timeout: Duration },
44}
45
46trait TransferObject:
47    std::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
48{
49}
50
51impl<T> TransferObject for T where
52    T: std::fmt::Debug
53        + serde::Serialize
54        + serde::de::DeserializeOwned
55        + Send
56        + Sync
57        + 'static
58        + Clone
59{
60}
61
62#[allow(private_bounds, reason = "there's no the third implementation")]
63pub trait ServiceRole: std::fmt::Debug + Send + Sync + 'static + Copy + Clone {
64    type Req: TransferObject + GetMeta + GetExtensions;
65    type Resp: TransferObject;
66    type Not: TryInto<CancelledNotification, Error = Self::Not>
67        + From<CancelledNotification>
68        + TransferObject;
69    type PeerReq: TransferObject + GetMeta + GetExtensions;
70    type PeerResp: TransferObject;
71    type PeerNot: TryInto<CancelledNotification, Error = Self::PeerNot>
72        + From<CancelledNotification>
73        + TransferObject
74        + GetMeta
75        + GetExtensions;
76    type InitializeError;
77    const IS_CLIENT: bool;
78    type Info: TransferObject;
79    type PeerInfo: TransferObject;
80}
81
82pub type TxJsonRpcMessage<R> =
83    JsonRpcMessage<<R as ServiceRole>::Req, <R as ServiceRole>::Resp, <R as ServiceRole>::Not>;
84pub type RxJsonRpcMessage<R> = JsonRpcMessage<
85    <R as ServiceRole>::PeerReq,
86    <R as ServiceRole>::PeerResp,
87    <R as ServiceRole>::PeerNot,
88>;
89
90pub trait Service<R: ServiceRole>: Send + Sync + 'static {
91    fn handle_request(
92        &self,
93        request: R::PeerReq,
94        context: RequestContext<R>,
95    ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_;
96    fn handle_notification(
97        &self,
98        notification: R::PeerNot,
99        context: NotificationContext<R>,
100    ) -> impl Future<Output = Result<(), McpError>> + Send + '_;
101    fn get_info(&self) -> R::Info;
102}
103
104pub trait ServiceExt<R: ServiceRole>: Service<R> + Sized {
105    /// Convert this service to a dynamic boxed service
106    ///
107    /// This could be very helpful when you want to store the services in a collection
108    fn into_dyn(self) -> Box<dyn DynService<R>> {
109        Box::new(self)
110    }
111    fn serve<T, E, A>(
112        self,
113        transport: T,
114    ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
115    where
116        T: IntoTransport<R, E, A>,
117        E: std::error::Error + Send + Sync + 'static,
118        Self: Sized,
119    {
120        Self::serve_with_ct(self, transport, Default::default())
121    }
122    fn serve_with_ct<T, E, A>(
123        self,
124        transport: T,
125        ct: CancellationToken,
126    ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
127    where
128        T: IntoTransport<R, E, A>,
129        E: std::error::Error + Send + Sync + 'static,
130        Self: Sized;
131}
132
133impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
134    fn handle_request(
135        &self,
136        request: R::PeerReq,
137        context: RequestContext<R>,
138    ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_ {
139        DynService::handle_request(self.as_ref(), request, context)
140    }
141
142    fn handle_notification(
143        &self,
144        notification: R::PeerNot,
145        context: NotificationContext<R>,
146    ) -> impl Future<Output = Result<(), McpError>> + Send + '_ {
147        DynService::handle_notification(self.as_ref(), notification, context)
148    }
149
150    fn get_info(&self) -> R::Info {
151        DynService::get_info(self.as_ref())
152    }
153}
154
155pub trait DynService<R: ServiceRole>: Send + Sync {
156    fn handle_request(
157        &self,
158        request: R::PeerReq,
159        context: RequestContext<R>,
160    ) -> BoxFuture<'_, Result<R::Resp, McpError>>;
161    fn handle_notification(
162        &self,
163        notification: R::PeerNot,
164        context: NotificationContext<R>,
165    ) -> BoxFuture<'_, Result<(), McpError>>;
166    fn get_info(&self) -> R::Info;
167}
168
169impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
170    fn handle_request(
171        &self,
172        request: R::PeerReq,
173        context: RequestContext<R>,
174    ) -> BoxFuture<'_, Result<R::Resp, McpError>> {
175        Box::pin(self.handle_request(request, context))
176    }
177    fn handle_notification(
178        &self,
179        notification: R::PeerNot,
180        context: NotificationContext<R>,
181    ) -> BoxFuture<'_, Result<(), McpError>> {
182        Box::pin(self.handle_notification(notification, context))
183    }
184    fn get_info(&self) -> R::Info {
185        self.get_info()
186    }
187}
188
189use std::{
190    collections::{HashMap, VecDeque},
191    ops::Deref,
192    sync::{Arc, atomic::AtomicU64},
193    time::Duration,
194};
195
196use tokio::sync::mpsc;
197
198pub trait RequestIdProvider: Send + Sync + 'static {
199    fn next_request_id(&self) -> RequestId;
200}
201
202pub trait ProgressTokenProvider: Send + Sync + 'static {
203    fn next_progress_token(&self) -> ProgressToken;
204}
205
206pub type AtomicU32RequestIdProvider = AtomicU32Provider;
207pub type AtomicU32ProgressTokenProvider = AtomicU32Provider;
208
209#[derive(Debug, Default)]
210pub struct AtomicU32Provider {
211    id: AtomicU64,
212}
213
214impl RequestIdProvider for AtomicU32Provider {
215    fn next_request_id(&self) -> RequestId {
216        let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
217        // Safe conversion: we start at 0 and increment by 1, so we won't overflow i64::MAX in practice
218        RequestId::Number(id as i64)
219    }
220}
221
222impl ProgressTokenProvider for AtomicU32Provider {
223    fn next_progress_token(&self) -> ProgressToken {
224        let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
225        ProgressToken(NumberOrString::Number(id as i64))
226    }
227}
228
229type Responder<T> = tokio::sync::oneshot::Sender<T>;
230
231/// A handle to a remote request
232///
233/// You can cancel it by call [`RequestHandle::cancel`] with a reason,
234///
235/// or wait for response by call [`RequestHandle::await_response`]
236#[derive(Debug)]
237pub struct RequestHandle<R: ServiceRole> {
238    pub rx: tokio::sync::oneshot::Receiver<Result<R::PeerResp, ServiceError>>,
239    pub options: PeerRequestOptions,
240    pub peer: Peer<R>,
241    pub id: RequestId,
242    pub progress_token: ProgressToken,
243}
244
245impl<R: ServiceRole> RequestHandle<R> {
246    pub const REQUEST_TIMEOUT_REASON: &str = "request timeout";
247    pub async fn await_response(self) -> Result<R::PeerResp, ServiceError> {
248        if let Some(timeout) = self.options.timeout {
249            let timeout_result = tokio::time::timeout(timeout, async move {
250                self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
251            })
252            .await;
253            match timeout_result {
254                Ok(response) => response,
255                Err(_) => {
256                    let error = Err(ServiceError::Timeout { timeout });
257                    // cancel this request
258                    let notification = CancelledNotification {
259                        params: CancelledNotificationParam {
260                            request_id: self.id,
261                            reason: Some(Self::REQUEST_TIMEOUT_REASON.to_owned()),
262                        },
263                        method: crate::model::CancelledNotificationMethod,
264                        extensions: Default::default(),
265                    };
266                    let _ = self.peer.send_notification(notification.into()).await;
267                    error
268                }
269            }
270        } else {
271            self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
272        }
273    }
274
275    /// Cancel this request
276    pub async fn cancel(self, reason: Option<String>) -> Result<(), ServiceError> {
277        let notification = CancelledNotification {
278            params: CancelledNotificationParam {
279                request_id: self.id,
280                reason,
281            },
282            method: crate::model::CancelledNotificationMethod,
283            extensions: Default::default(),
284        };
285        self.peer.send_notification(notification.into()).await?;
286        Ok(())
287    }
288}
289
290#[derive(Debug)]
291pub(crate) enum PeerSinkMessage<R: ServiceRole> {
292    Request {
293        request: R::Req,
294        id: RequestId,
295        responder: Responder<Result<R::PeerResp, ServiceError>>,
296    },
297    Notification {
298        notification: R::Not,
299        responder: Responder<Result<(), ServiceError>>,
300    },
301}
302
303/// An interface to fetch the remote client or server
304///
305/// For general purpose, call [`Peer::send_request`] or [`Peer::send_notification`] to send message to remote peer.
306///
307/// To create a cancellable request, call [`Peer::send_request_with_option`].
308#[derive(Clone)]
309pub struct Peer<R: ServiceRole> {
310    tx: mpsc::Sender<PeerSinkMessage<R>>,
311    request_id_provider: Arc<dyn RequestIdProvider>,
312    progress_token_provider: Arc<dyn ProgressTokenProvider>,
313    info: Arc<tokio::sync::OnceCell<R::PeerInfo>>,
314}
315
316impl<R: ServiceRole> std::fmt::Debug for Peer<R> {
317    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318        f.debug_struct("PeerSink")
319            .field("tx", &self.tx)
320            .field("is_client", &R::IS_CLIENT)
321            .finish()
322    }
323}
324
325type ProxyOutbound<R> = mpsc::Receiver<PeerSinkMessage<R>>;
326
327#[derive(Debug, Default)]
328pub struct PeerRequestOptions {
329    pub timeout: Option<Duration>,
330    pub meta: Option<Meta>,
331}
332
333impl PeerRequestOptions {
334    pub fn no_options() -> Self {
335        Self::default()
336    }
337}
338
339impl<R: ServiceRole> Peer<R> {
340    const CLIENT_CHANNEL_BUFFER_SIZE: usize = 1024;
341    pub(crate) fn new(
342        request_id_provider: Arc<dyn RequestIdProvider>,
343        peer_info: Option<R::PeerInfo>,
344    ) -> (Peer<R>, ProxyOutbound<R>) {
345        let (tx, rx) = mpsc::channel(Self::CLIENT_CHANNEL_BUFFER_SIZE);
346        (
347            Self {
348                tx,
349                request_id_provider,
350                progress_token_provider: Arc::new(AtomicU32ProgressTokenProvider::default()),
351                info: Arc::new(tokio::sync::OnceCell::new_with(peer_info)),
352            },
353            rx,
354        )
355    }
356    pub async fn send_notification(&self, notification: R::Not) -> Result<(), ServiceError> {
357        let (responder, receiver) = tokio::sync::oneshot::channel();
358        self.tx
359            .send(PeerSinkMessage::Notification {
360                notification,
361                responder,
362            })
363            .await
364            .map_err(|_m| ServiceError::TransportClosed)?;
365        receiver.await.map_err(|_e| ServiceError::TransportClosed)?
366    }
367    pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
368        self.send_request_with_option(request, PeerRequestOptions::no_options())
369            .await?
370            .await_response()
371            .await
372    }
373
374    pub async fn send_cancellable_request(
375        &self,
376        request: R::Req,
377        options: PeerRequestOptions,
378    ) -> Result<RequestHandle<R>, ServiceError> {
379        self.send_request_with_option(request, options).await
380    }
381
382    pub async fn send_request_with_option(
383        &self,
384        mut request: R::Req,
385        options: PeerRequestOptions,
386    ) -> Result<RequestHandle<R>, ServiceError> {
387        let id = self.request_id_provider.next_request_id();
388        let progress_token = self.progress_token_provider.next_progress_token();
389        request
390            .get_meta_mut()
391            .set_progress_token(progress_token.clone());
392        if let Some(meta) = options.meta.clone() {
393            request.get_meta_mut().extend(meta);
394        }
395        let (responder, receiver) = tokio::sync::oneshot::channel();
396        self.tx
397            .send(PeerSinkMessage::Request {
398                request,
399                id: id.clone(),
400                responder,
401            })
402            .await
403            .map_err(|_m| ServiceError::TransportClosed)?;
404        Ok(RequestHandle {
405            id,
406            rx: receiver,
407            progress_token,
408            options,
409            peer: self.clone(),
410        })
411    }
412    pub fn peer_info(&self) -> Option<&R::PeerInfo> {
413        self.info.get()
414    }
415
416    pub fn set_peer_info(&self, info: R::PeerInfo) {
417        if self.info.initialized() {
418            tracing::warn!("trying to set peer info, which is already initialized");
419        } else {
420            let _ = self.info.set(info);
421        }
422    }
423
424    pub fn is_transport_closed(&self) -> bool {
425        self.tx.is_closed()
426    }
427}
428
429#[derive(Debug)]
430pub struct RunningService<R: ServiceRole, S: Service<R>> {
431    service: Arc<S>,
432    peer: Peer<R>,
433    handle: Option<tokio::task::JoinHandle<QuitReason>>,
434    cancellation_token: CancellationToken,
435    dg: DropGuard,
436}
437impl<R: ServiceRole, S: Service<R>> Deref for RunningService<R, S> {
438    type Target = Peer<R>;
439
440    fn deref(&self) -> &Self::Target {
441        &self.peer
442    }
443}
444
445impl<R: ServiceRole, S: Service<R>> RunningService<R, S> {
446    #[inline]
447    pub fn peer(&self) -> &Peer<R> {
448        &self.peer
449    }
450    #[inline]
451    pub fn service(&self) -> &S {
452        self.service.as_ref()
453    }
454    #[inline]
455    pub fn cancellation_token(&self) -> RunningServiceCancellationToken {
456        RunningServiceCancellationToken(self.cancellation_token.clone())
457    }
458
459    /// Returns true if the service has been closed or cancelled.
460    #[inline]
461    pub fn is_closed(&self) -> bool {
462        self.handle.is_none() || self.cancellation_token.is_cancelled()
463    }
464
465    /// Wait for the service to complete.
466    ///
467    /// This will block until the service loop terminates (either due to
468    /// cancellation, transport closure, or an error).
469    #[inline]
470    pub async fn waiting(mut self) -> Result<QuitReason, tokio::task::JoinError> {
471        match self.handle.take() {
472            Some(handle) => handle.await,
473            None => Ok(QuitReason::Closed),
474        }
475    }
476
477    /// Gracefully close the connection and wait for cleanup to complete.
478    ///
479    /// This method cancels the service, waits for the background task to finish
480    /// (which includes calling `transport.close()`), and ensures all cleanup
481    /// operations complete before returning.
482    ///
483    /// Unlike [`cancel`](Self::cancel), this method takes `&mut self` and can be
484    /// called without consuming the `RunningService`. After calling this method,
485    /// the service is considered closed and subsequent operations will fail.
486    ///
487    /// # Example
488    ///
489    /// ```rust,ignore
490    /// let mut client = ().serve(transport).await?;
491    /// // ... use the client ...
492    /// client.close().await?;
493    /// ```
494    pub async fn close(&mut self) -> Result<QuitReason, tokio::task::JoinError> {
495        if let Some(handle) = self.handle.take() {
496            // Disarm the drop guard so it doesn't try to cancel again
497            // We need to cancel manually and wait for completion
498            self.cancellation_token.cancel();
499            handle.await
500        } else {
501            // Already closed
502            Ok(QuitReason::Closed)
503        }
504    }
505
506    /// Gracefully close the connection with a timeout.
507    ///
508    /// Similar to [`close`](Self::close), but returns after the specified timeout
509    /// if the cleanup doesn't complete in time. This is useful for ensuring
510    /// a bounded shutdown time.
511    ///
512    /// Returns `Ok(Some(reason))` if shutdown completed within the timeout,
513    /// `Ok(None)` if the timeout was reached, or `Err` if there was a join error.
514    pub async fn close_with_timeout(
515        &mut self,
516        timeout: Duration,
517    ) -> Result<Option<QuitReason>, tokio::task::JoinError> {
518        if let Some(handle) = self.handle.take() {
519            self.cancellation_token.cancel();
520            match tokio::time::timeout(timeout, handle).await {
521                Ok(result) => result.map(Some),
522                Err(_elapsed) => {
523                    tracing::warn!(
524                        "close_with_timeout: cleanup did not complete within {:?}",
525                        timeout
526                    );
527                    Ok(None)
528                }
529            }
530        } else {
531            Ok(Some(QuitReason::Closed))
532        }
533    }
534
535    /// Cancel the service and wait for cleanup to complete.
536    ///
537    /// This consumes the `RunningService` and ensures the connection is properly
538    /// closed. For a non-consuming alternative, see [`close`](Self::close).
539    pub async fn cancel(mut self) -> Result<QuitReason, tokio::task::JoinError> {
540        // Disarm the drop guard since we're handling cancellation explicitly
541        let _ = std::mem::replace(&mut self.dg, self.cancellation_token.clone().drop_guard());
542        self.close().await
543    }
544}
545
546impl<R: ServiceRole, S: Service<R>> Drop for RunningService<R, S> {
547    fn drop(&mut self) {
548        if self.handle.is_some() && !self.cancellation_token.is_cancelled() {
549            tracing::debug!(
550                "RunningService dropped without explicit close(). \
551                 The connection will be closed asynchronously. \
552                 For guaranteed cleanup, call close() or cancel() before dropping."
553            );
554        }
555        // The DropGuard will handle cancellation
556    }
557}
558
559// use a wrapper type so we can tweak the implementation if needed
560pub struct RunningServiceCancellationToken(CancellationToken);
561
562impl RunningServiceCancellationToken {
563    pub fn cancel(self) {
564        self.0.cancel();
565    }
566}
567
568#[derive(Debug)]
569#[non_exhaustive]
570pub enum QuitReason {
571    Cancelled,
572    Closed,
573    JoinError(tokio::task::JoinError),
574}
575
576/// Request execution context
577#[derive(Debug, Clone)]
578pub struct RequestContext<R: ServiceRole> {
579    /// this token will be cancelled when the [`CancelledNotification`] is received.
580    pub ct: CancellationToken,
581    pub id: RequestId,
582    pub meta: Meta,
583    pub extensions: Extensions,
584    /// An interface to fetch the remote client or server
585    pub peer: Peer<R>,
586}
587
588impl<R: ServiceRole> RequestContext<R> {
589    /// Create a new RequestContext.
590    pub fn new(id: RequestId, peer: Peer<R>) -> Self {
591        Self {
592            ct: CancellationToken::new(),
593            id,
594            meta: Meta::default(),
595            extensions: Extensions::default(),
596            peer,
597        }
598    }
599}
600
601/// Request execution context
602#[derive(Debug, Clone)]
603pub struct NotificationContext<R: ServiceRole> {
604    pub meta: Meta,
605    pub extensions: Extensions,
606    /// An interface to fetch the remote client or server
607    pub peer: Peer<R>,
608}
609
610/// Use this function to skip initialization process
611pub fn serve_directly<R, S, T, E, A>(
612    service: S,
613    transport: T,
614    peer_info: Option<R::PeerInfo>,
615) -> RunningService<R, S>
616where
617    R: ServiceRole,
618    S: Service<R>,
619    T: IntoTransport<R, E, A>,
620    E: std::error::Error + Send + Sync + 'static,
621{
622    serve_directly_with_ct(service, transport, peer_info, Default::default())
623}
624
625/// Use this function to skip initialization process
626pub fn serve_directly_with_ct<R, S, T, E, A>(
627    service: S,
628    transport: T,
629    peer_info: Option<R::PeerInfo>,
630    ct: CancellationToken,
631) -> RunningService<R, S>
632where
633    R: ServiceRole,
634    S: Service<R>,
635    T: IntoTransport<R, E, A>,
636    E: std::error::Error + Send + Sync + 'static,
637{
638    let (peer, peer_rx) = Peer::new(Arc::new(AtomicU32RequestIdProvider::default()), peer_info);
639    serve_inner(service, transport.into_transport(), peer, peer_rx, ct)
640}
641
642#[instrument(skip_all)]
643fn serve_inner<R, S, T>(
644    service: S,
645    transport: T,
646    peer: Peer<R>,
647    mut peer_rx: tokio::sync::mpsc::Receiver<PeerSinkMessage<R>>,
648    ct: CancellationToken,
649) -> RunningService<R, S>
650where
651    R: ServiceRole,
652    S: Service<R>,
653    T: Transport<R> + 'static,
654{
655    const SINK_PROXY_BUFFER_SIZE: usize = 64;
656    let (sink_proxy_tx, mut sink_proxy_rx) =
657        tokio::sync::mpsc::channel::<TxJsonRpcMessage<R>>(SINK_PROXY_BUFFER_SIZE);
658    let peer_info = peer.peer_info();
659    if R::IS_CLIENT {
660        tracing::info!(?peer_info, "Service initialized as client");
661    } else {
662        tracing::info!(?peer_info, "Service initialized as server");
663    }
664
665    let mut local_responder_pool =
666        HashMap::<RequestId, Responder<Result<R::PeerResp, ServiceError>>>::new();
667    let mut local_ct_pool = HashMap::<RequestId, CancellationToken>::new();
668    let shared_service = Arc::new(service);
669    // for return
670    let service = shared_service.clone();
671
672    // let message_sink = tokio::sync::
673    // let mut stream = std::pin::pin!(stream);
674    let serve_loop_ct = ct.child_token();
675    let peer_return: Peer<R> = peer.clone();
676    let current_span = tracing::Span::current();
677    let handle = tokio::spawn(async move {
678        let mut transport = transport.into_transport();
679        let mut batch_messages = VecDeque::<RxJsonRpcMessage<R>>::new();
680        let mut send_task_set = tokio::task::JoinSet::<SendTaskResult>::new();
681        #[derive(Debug)]
682        enum SendTaskResult {
683            Request {
684                id: RequestId,
685                result: Result<(), DynamicTransportError>,
686            },
687            Notification {
688                responder: Responder<Result<(), ServiceError>>,
689                cancellation_param: Option<CancelledNotificationParam>,
690                result: Result<(), DynamicTransportError>,
691            },
692        }
693        #[derive(Debug)]
694        enum Event<R: ServiceRole> {
695            ProxyMessage(PeerSinkMessage<R>),
696            PeerMessage(RxJsonRpcMessage<R>),
697            ToSink(TxJsonRpcMessage<R>),
698            SendTaskResult(SendTaskResult),
699        }
700
701        let quit_reason = loop {
702            let evt = if let Some(m) = batch_messages.pop_front() {
703                Event::PeerMessage(m)
704            } else {
705                tokio::select! {
706                    m = sink_proxy_rx.recv(), if !sink_proxy_rx.is_closed() => {
707                        if let Some(m) = m {
708                            Event::ToSink(m)
709                        } else {
710                            continue
711                        }
712                    }
713                    m = transport.receive() => {
714                        if let Some(m) = m {
715                            Event::PeerMessage(m)
716                        } else {
717                            // input stream closed
718                            tracing::info!("input stream terminated");
719                            break QuitReason::Closed
720                        }
721                    }
722                    m = peer_rx.recv(), if !peer_rx.is_closed() => {
723                        if let Some(m) = m {
724                            Event::ProxyMessage(m)
725                        } else {
726                            continue
727                        }
728                    }
729                    m = send_task_set.join_next(), if !send_task_set.is_empty() => {
730                        let Some(result) = m else {
731                            continue
732                        };
733                        match result {
734                            Err(e) => {
735                                // join error, which is serious, we should quit.
736                                tracing::error!(%e, "send request task encounter a tokio join error");
737                                break QuitReason::JoinError(e)
738                            }
739                            Ok(result) => {
740                                Event::SendTaskResult(result)
741                            }
742                        }
743                    }
744                    _ = serve_loop_ct.cancelled() => {
745                        tracing::info!("task cancelled");
746                        break QuitReason::Cancelled
747                    }
748                }
749            };
750
751            tracing::trace!(?evt, "new event");
752            match evt {
753                Event::SendTaskResult(SendTaskResult::Request { id, result }) => {
754                    if let Err(e) = result {
755                        if let Some(responder) = local_responder_pool.remove(&id) {
756                            let _ = responder.send(Err(ServiceError::TransportSend(e)));
757                        }
758                    }
759                }
760                Event::SendTaskResult(SendTaskResult::Notification {
761                    responder,
762                    result,
763                    cancellation_param,
764                }) => {
765                    let response = if let Err(e) = result {
766                        Err(ServiceError::TransportSend(e))
767                    } else {
768                        Ok(())
769                    };
770                    let _ = responder.send(response);
771                    if let Some(param) = cancellation_param {
772                        if let Some(responder) = local_responder_pool.remove(&param.request_id) {
773                            tracing::info!(id = %param.request_id, reason = param.reason, "cancelled");
774                            let _response_result = responder.send(Err(ServiceError::Cancelled {
775                                reason: param.reason.clone(),
776                            }));
777                        }
778                    }
779                }
780                // response and error
781                Event::ToSink(m) => {
782                    if let Some(id) = match &m {
783                        JsonRpcMessage::Response(response) => Some(&response.id),
784                        JsonRpcMessage::Error(error) => Some(&error.id),
785                        _ => None,
786                    } {
787                        if let Some(ct) = local_ct_pool.remove(id) {
788                            ct.cancel();
789                        }
790                        let send = transport.send(m);
791                        let current_span = tracing::Span::current();
792                        tokio::spawn(async move {
793                            let send_result = send.await;
794                            if let Err(error) = send_result {
795                                tracing::error!(%error, "fail to response message");
796                            }
797                        }.instrument(current_span));
798                    }
799                }
800                Event::ProxyMessage(PeerSinkMessage::Request {
801                    request,
802                    id,
803                    responder,
804                }) => {
805                    local_responder_pool.insert(id.clone(), responder);
806                    let send = transport.send(JsonRpcMessage::request(request, id.clone()));
807                    {
808                        let id = id.clone();
809                        let current_span = tracing::Span::current();
810                        send_task_set.spawn(send.map(move |r| SendTaskResult::Request {
811                            id,
812                            result: r.map_err(DynamicTransportError::new::<T, R>),
813                        }).instrument(current_span));
814                    }
815                }
816                Event::ProxyMessage(PeerSinkMessage::Notification {
817                    notification,
818                    responder,
819                }) => {
820                    // catch cancellation notification
821                    let mut cancellation_param = None;
822                    let notification = match notification.try_into() {
823                        Ok::<CancelledNotification, _>(cancelled) => {
824                            cancellation_param.replace(cancelled.params.clone());
825                            cancelled.into()
826                        }
827                        Err(notification) => notification,
828                    };
829                    let send = transport.send(JsonRpcMessage::notification(notification));
830                    let current_span = tracing::Span::current();
831                    send_task_set.spawn(send.map(move |result| SendTaskResult::Notification {
832                        responder,
833                        cancellation_param,
834                        result: result.map_err(DynamicTransportError::new::<T, R>),
835                    }).instrument(current_span));
836                }
837                Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
838                    id,
839                    mut request,
840                    ..
841                })) => {
842                    tracing::debug!(%id, ?request, "received request");
843                    {
844                        let service = shared_service.clone();
845                        let sink = sink_proxy_tx.clone();
846                        let request_ct = serve_loop_ct.child_token();
847                        let context_ct = request_ct.child_token();
848                        local_ct_pool.insert(id.clone(), request_ct);
849                        let mut extensions = Extensions::new();
850                        let mut meta = Meta::new();
851                        // avoid clone
852                        // swap meta firstly, otherwise progress token will be lost
853                        std::mem::swap(&mut meta, request.get_meta_mut());
854                        std::mem::swap(&mut extensions, request.extensions_mut());
855                        let context = RequestContext {
856                            ct: context_ct,
857                            id: id.clone(),
858                            peer: peer.clone(),
859                            meta,
860                            extensions,
861                        };
862                        let current_span = tracing::Span::current();
863                        tokio::spawn(async move {
864                            let result = service
865                                .handle_request(request, context)
866                                .await;
867                            let response = match result {
868                                Ok(result) => {
869                                    tracing::debug!(%id, ?result, "response message");
870                                    JsonRpcMessage::response(result, id)
871                                }
872                                Err(error) => {
873                                    tracing::warn!(%id, ?error, "response error");
874                                    JsonRpcMessage::error(error, id)
875                                }
876                            };
877                            let _send_result = sink.send(response).await;
878                        }.instrument(current_span));
879                    }
880                }
881                Event::PeerMessage(JsonRpcMessage::Notification(JsonRpcNotification {
882                    notification,
883                    ..
884                })) => {
885                    tracing::info!(?notification, "received notification");
886                    // catch cancelled notification
887                    let mut notification = match notification.try_into() {
888                        Ok::<CancelledNotification, _>(cancelled) => {
889                            if let Some(ct) = local_ct_pool.remove(&cancelled.params.request_id) {
890                                tracing::info!(id = %cancelled.params.request_id, reason = cancelled.params.reason, "cancelled");
891                                ct.cancel();
892                            }
893                            cancelled.into()
894                        }
895                        Err(notification) => notification,
896                    };
897                    {
898                        let service = shared_service.clone();
899                        let mut extensions = Extensions::new();
900                        let mut meta = Meta::new();
901                        // avoid clone
902                        std::mem::swap(&mut extensions, notification.extensions_mut());
903                        std::mem::swap(&mut meta, notification.get_meta_mut());
904                        let context = NotificationContext {
905                            peer: peer.clone(),
906                            meta,
907                            extensions,
908                        };
909                        let current_span = tracing::Span::current();
910                        tokio::spawn(async move {
911                            let result = service.handle_notification(notification, context).await;
912                            if let Err(error) = result {
913                                tracing::warn!(%error, "Error sending notification");
914                            }
915                        }.instrument(current_span));
916                    }
917                }
918                Event::PeerMessage(JsonRpcMessage::Response(JsonRpcResponse {
919                    result,
920                    id,
921                    ..
922                })) => {
923                    if let Some(responder) = local_responder_pool.remove(&id) {
924                        let response_result = responder.send(Ok(result));
925                        if let Err(_error) = response_result {
926                            tracing::warn!(%id, "Error sending response");
927                        }
928                    }
929                }
930                Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
931                    if let Some(responder) = local_responder_pool.remove(&id) {
932                        let _response_result = responder.send(Err(ServiceError::McpError(error)));
933                        if let Err(_error) = _response_result {
934                            tracing::warn!(%id, "Error sending response");
935                        }
936                    }
937                }
938            }
939        };
940        let sink_close_result = transport.close().await;
941        if let Err(e) = sink_close_result {
942            tracing::error!(%e, "fail to close sink");
943        }
944        tracing::info!(?quit_reason, "serve finished");
945        quit_reason
946    }.instrument(current_span));
947    RunningService {
948        service,
949        peer: peer_return,
950        handle: Some(handle),
951        cancellation_token: ct.clone(),
952        dg: ct.drop_guard(),
953    }
954}