Skip to main content

mcpkit_rs/
service.rs

1use futures::{FutureExt, future::BoxFuture};
2use thiserror::Error;
3
4#[cfg(feature = "server")]
5use crate::model::ServerJsonRpcMessage;
6use crate::{
7    error::ErrorData as McpError,
8    model::{
9        CancelledNotification, CancelledNotificationParam, Extensions, GetExtensions, GetMeta,
10        JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Meta,
11        NumberOrString, ProgressToken, RequestId,
12    },
13    transport::{DynamicTransportError, IntoTransport, Transport},
14};
15#[cfg(feature = "client")]
16#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
17mod client;
18#[cfg(feature = "client")]
19#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
20pub use client::*;
21#[cfg(feature = "server")]
22#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
23mod server;
24#[cfg(feature = "server")]
25#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
26pub use server::*;
27#[cfg(feature = "tower")]
28#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
29mod tower;
30use tokio_util::sync::{CancellationToken, DropGuard};
31#[cfg(feature = "tower")]
32#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
33pub use tower::*;
34use tracing::{Instrument as _, instrument};
35#[derive(Error, Debug)]
36#[non_exhaustive]
37pub enum ServiceError {
38    #[error("Mcp error: {0}")]
39    McpError(McpError),
40    #[error("Transport send error: {0}")]
41    TransportSend(DynamicTransportError),
42    #[error("Transport closed")]
43    TransportClosed,
44    #[error("Unexpected response type")]
45    UnexpectedResponse,
46    #[error("task cancelled for reason {}", reason.as_deref().unwrap_or("<unknown>"))]
47    Cancelled { reason: Option<String> },
48    #[error("request timeout after {}", chrono::Duration::from_std(*timeout).unwrap_or_default())]
49    Timeout { timeout: Duration },
50}
51
52trait TransferObject:
53    std::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
54{
55}
56
57impl<T> TransferObject for T where
58    T: std::fmt::Debug
59        + serde::Serialize
60        + serde::de::DeserializeOwned
61        + Send
62        + Sync
63        + 'static
64        + Clone
65{
66}
67
68#[allow(private_bounds, reason = "there's no the third implementation")]
69pub trait ServiceRole: std::fmt::Debug + Send + Sync + 'static + Copy + Clone {
70    type Req: TransferObject + GetMeta + GetExtensions;
71    type Resp: TransferObject;
72    type Not: TryInto<CancelledNotification, Error = Self::Not>
73        + From<CancelledNotification>
74        + TransferObject;
75    type PeerReq: TransferObject + GetMeta + GetExtensions;
76    type PeerResp: TransferObject;
77    type PeerNot: TryInto<CancelledNotification, Error = Self::PeerNot>
78        + From<CancelledNotification>
79        + TransferObject
80        + GetMeta
81        + GetExtensions;
82    type InitializeError;
83    const IS_CLIENT: bool;
84    type Info: TransferObject;
85    type PeerInfo: TransferObject;
86}
87
88pub type TxJsonRpcMessage<R> =
89    JsonRpcMessage<<R as ServiceRole>::Req, <R as ServiceRole>::Resp, <R as ServiceRole>::Not>;
90pub type RxJsonRpcMessage<R> = JsonRpcMessage<
91    <R as ServiceRole>::PeerReq,
92    <R as ServiceRole>::PeerResp,
93    <R as ServiceRole>::PeerNot,
94>;
95
96pub trait Service<R: ServiceRole>: Send + Sync + 'static {
97    fn handle_request(
98        &self,
99        request: R::PeerReq,
100        context: RequestContext<R>,
101    ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_;
102    fn handle_notification(
103        &self,
104        notification: R::PeerNot,
105        context: NotificationContext<R>,
106    ) -> impl Future<Output = Result<(), McpError>> + Send + '_;
107    fn get_info(&self) -> R::Info;
108}
109
110pub trait ServiceExt<R: ServiceRole>: Service<R> + Sized {
111    /// Convert this service to a dynamic boxed service
112    ///
113    /// This could be very helpful when you want to store the services in a collection
114    fn into_dyn(self) -> Box<dyn DynService<R>> {
115        Box::new(self)
116    }
117    fn serve<T, E, A>(
118        self,
119        transport: T,
120    ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
121    where
122        T: IntoTransport<R, E, A>,
123        E: std::error::Error + Send + Sync + 'static,
124        Self: Sized,
125    {
126        Self::serve_with_ct(self, transport, Default::default())
127    }
128    fn serve_with_ct<T, E, A>(
129        self,
130        transport: T,
131        ct: CancellationToken,
132    ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
133    where
134        T: IntoTransport<R, E, A>,
135        E: std::error::Error + Send + Sync + 'static,
136        Self: Sized;
137}
138
139impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
140    fn handle_request(
141        &self,
142        request: R::PeerReq,
143        context: RequestContext<R>,
144    ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_ {
145        DynService::handle_request(self.as_ref(), request, context)
146    }
147
148    fn handle_notification(
149        &self,
150        notification: R::PeerNot,
151        context: NotificationContext<R>,
152    ) -> impl Future<Output = Result<(), McpError>> + Send + '_ {
153        DynService::handle_notification(self.as_ref(), notification, context)
154    }
155
156    fn get_info(&self) -> R::Info {
157        DynService::get_info(self.as_ref())
158    }
159}
160
161pub trait DynService<R: ServiceRole>: Send + Sync {
162    fn handle_request(
163        &self,
164        request: R::PeerReq,
165        context: RequestContext<R>,
166    ) -> BoxFuture<'_, Result<R::Resp, McpError>>;
167    fn handle_notification(
168        &self,
169        notification: R::PeerNot,
170        context: NotificationContext<R>,
171    ) -> BoxFuture<'_, Result<(), McpError>>;
172    fn get_info(&self) -> R::Info;
173}
174
175impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
176    fn handle_request(
177        &self,
178        request: R::PeerReq,
179        context: RequestContext<R>,
180    ) -> BoxFuture<'_, Result<R::Resp, McpError>> {
181        Box::pin(self.handle_request(request, context))
182    }
183    fn handle_notification(
184        &self,
185        notification: R::PeerNot,
186        context: NotificationContext<R>,
187    ) -> BoxFuture<'_, Result<(), McpError>> {
188        Box::pin(self.handle_notification(notification, context))
189    }
190    fn get_info(&self) -> R::Info {
191        self.get_info()
192    }
193}
194
195use std::{
196    collections::{HashMap, VecDeque},
197    ops::Deref,
198    sync::{Arc, atomic::AtomicU64},
199    time::Duration,
200};
201
202use tokio::sync::mpsc;
203
204pub trait RequestIdProvider: Send + Sync + 'static {
205    fn next_request_id(&self) -> RequestId;
206}
207
208pub trait ProgressTokenProvider: Send + Sync + 'static {
209    fn next_progress_token(&self) -> ProgressToken;
210}
211
212pub type AtomicU32RequestIdProvider = AtomicU32Provider;
213pub type AtomicU32ProgressTokenProvider = AtomicU32Provider;
214
215#[derive(Debug, Default)]
216pub struct AtomicU32Provider {
217    id: AtomicU64,
218}
219
220impl RequestIdProvider for AtomicU32Provider {
221    fn next_request_id(&self) -> RequestId {
222        let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
223        // Safe conversion: we start at 0 and increment by 1, so we won't overflow i64::MAX in practice
224        RequestId::Number(id as i64)
225    }
226}
227
228impl ProgressTokenProvider for AtomicU32Provider {
229    fn next_progress_token(&self) -> ProgressToken {
230        let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
231        ProgressToken(NumberOrString::Number(id as i64))
232    }
233}
234
235type Responder<T> = tokio::sync::oneshot::Sender<T>;
236
237/// A handle to a remote request
238///
239/// You can cancel it by call [`RequestHandle::cancel`] with a reason,
240///
241/// or wait for response by call [`RequestHandle::await_response`]
242#[derive(Debug)]
243pub struct RequestHandle<R: ServiceRole> {
244    pub rx: tokio::sync::oneshot::Receiver<Result<R::PeerResp, ServiceError>>,
245    pub options: PeerRequestOptions,
246    pub peer: Peer<R>,
247    pub id: RequestId,
248    pub progress_token: ProgressToken,
249}
250
251impl<R: ServiceRole> RequestHandle<R> {
252    pub const REQUEST_TIMEOUT_REASON: &str = "request timeout";
253    pub async fn await_response(self) -> Result<R::PeerResp, ServiceError> {
254        if let Some(timeout) = self.options.timeout {
255            let timeout_result = tokio::time::timeout(timeout, async move {
256                self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
257            })
258            .await;
259            match timeout_result {
260                Ok(response) => response,
261                Err(_) => {
262                    let error = Err(ServiceError::Timeout { timeout });
263                    // cancel this request
264                    let notification = CancelledNotification {
265                        params: CancelledNotificationParam {
266                            request_id: self.id,
267                            reason: Some(Self::REQUEST_TIMEOUT_REASON.to_owned()),
268                        },
269                        method: crate::model::CancelledNotificationMethod,
270                        extensions: Default::default(),
271                    };
272                    let _ = self.peer.send_notification(notification.into()).await;
273                    error
274                }
275            }
276        } else {
277            self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
278        }
279    }
280
281    /// Cancel this request
282    pub async fn cancel(self, reason: Option<String>) -> Result<(), ServiceError> {
283        let notification = CancelledNotification {
284            params: CancelledNotificationParam {
285                request_id: self.id,
286                reason,
287            },
288            method: crate::model::CancelledNotificationMethod,
289            extensions: Default::default(),
290        };
291        self.peer.send_notification(notification.into()).await?;
292        Ok(())
293    }
294}
295
296#[derive(Debug)]
297pub(crate) enum PeerSinkMessage<R: ServiceRole> {
298    Request {
299        request: R::Req,
300        id: RequestId,
301        responder: Responder<Result<R::PeerResp, ServiceError>>,
302    },
303    Notification {
304        notification: R::Not,
305        responder: Responder<Result<(), ServiceError>>,
306    },
307}
308
309/// An interface to fetch the remote client or server
310///
311/// For general purpose, call [`Peer::send_request`] or [`Peer::send_notification`] to send message to remote peer.
312///
313/// To create a cancellable request, call [`Peer::send_request_with_option`].
314#[derive(Clone)]
315pub struct Peer<R: ServiceRole> {
316    tx: mpsc::Sender<PeerSinkMessage<R>>,
317    request_id_provider: Arc<dyn RequestIdProvider>,
318    progress_token_provider: Arc<dyn ProgressTokenProvider>,
319    info: Arc<tokio::sync::OnceCell<R::PeerInfo>>,
320}
321
322impl<R: ServiceRole> std::fmt::Debug for Peer<R> {
323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324        f.debug_struct("PeerSink")
325            .field("tx", &self.tx)
326            .field("is_client", &R::IS_CLIENT)
327            .finish()
328    }
329}
330
331type ProxyOutbound<R> = mpsc::Receiver<PeerSinkMessage<R>>;
332
333#[derive(Debug, Default)]
334pub struct PeerRequestOptions {
335    pub timeout: Option<Duration>,
336    pub meta: Option<Meta>,
337}
338
339impl PeerRequestOptions {
340    pub fn no_options() -> Self {
341        Self::default()
342    }
343}
344
345impl<R: ServiceRole> Peer<R> {
346    const CLIENT_CHANNEL_BUFFER_SIZE: usize = 1024;
347    pub(crate) fn new(
348        request_id_provider: Arc<dyn RequestIdProvider>,
349        peer_info: Option<R::PeerInfo>,
350    ) -> (Peer<R>, ProxyOutbound<R>) {
351        let (tx, rx) = mpsc::channel(Self::CLIENT_CHANNEL_BUFFER_SIZE);
352        (
353            Self {
354                tx,
355                request_id_provider,
356                progress_token_provider: Arc::new(AtomicU32ProgressTokenProvider::default()),
357                info: Arc::new(tokio::sync::OnceCell::new_with(peer_info)),
358            },
359            rx,
360        )
361    }
362    pub async fn send_notification(&self, notification: R::Not) -> Result<(), ServiceError> {
363        let (responder, receiver) = tokio::sync::oneshot::channel();
364        self.tx
365            .send(PeerSinkMessage::Notification {
366                notification,
367                responder,
368            })
369            .await
370            .map_err(|_m| ServiceError::TransportClosed)?;
371        receiver.await.map_err(|_e| ServiceError::TransportClosed)?
372    }
373    pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
374        self.send_request_with_option(request, PeerRequestOptions::no_options())
375            .await?
376            .await_response()
377            .await
378    }
379
380    pub async fn send_cancellable_request(
381        &self,
382        request: R::Req,
383        options: PeerRequestOptions,
384    ) -> Result<RequestHandle<R>, ServiceError> {
385        self.send_request_with_option(request, options).await
386    }
387
388    pub async fn send_request_with_option(
389        &self,
390        mut request: R::Req,
391        options: PeerRequestOptions,
392    ) -> Result<RequestHandle<R>, ServiceError> {
393        let id = self.request_id_provider.next_request_id();
394        let progress_token = self.progress_token_provider.next_progress_token();
395        request
396            .get_meta_mut()
397            .set_progress_token(progress_token.clone());
398        if let Some(meta) = options.meta.clone() {
399            request.get_meta_mut().extend(meta);
400        }
401        let (responder, receiver) = tokio::sync::oneshot::channel();
402        self.tx
403            .send(PeerSinkMessage::Request {
404                request,
405                id: id.clone(),
406                responder,
407            })
408            .await
409            .map_err(|_m| ServiceError::TransportClosed)?;
410        Ok(RequestHandle {
411            id,
412            rx: receiver,
413            progress_token,
414            options,
415            peer: self.clone(),
416        })
417    }
418    pub fn peer_info(&self) -> Option<&R::PeerInfo> {
419        self.info.get()
420    }
421
422    pub fn set_peer_info(&self, info: R::PeerInfo) {
423        if self.info.initialized() {
424            tracing::warn!("trying to set peer info, which is already initialized");
425        } else {
426            let _ = self.info.set(info);
427        }
428    }
429
430    pub fn is_transport_closed(&self) -> bool {
431        self.tx.is_closed()
432    }
433}
434
435#[derive(Debug)]
436pub struct RunningService<R: ServiceRole, S: Service<R>> {
437    service: Arc<S>,
438    peer: Peer<R>,
439    handle: Option<tokio::task::JoinHandle<QuitReason>>,
440    cancellation_token: CancellationToken,
441    dg: DropGuard,
442}
443impl<R: ServiceRole, S: Service<R>> Deref for RunningService<R, S> {
444    type Target = Peer<R>;
445
446    fn deref(&self) -> &Self::Target {
447        &self.peer
448    }
449}
450
451impl<R: ServiceRole, S: Service<R>> RunningService<R, S> {
452    #[inline]
453    pub fn peer(&self) -> &Peer<R> {
454        &self.peer
455    }
456    #[inline]
457    pub fn service(&self) -> &S {
458        self.service.as_ref()
459    }
460    #[inline]
461    pub fn cancellation_token(&self) -> RunningServiceCancellationToken {
462        RunningServiceCancellationToken(self.cancellation_token.clone())
463    }
464
465    /// Returns true if the service has been closed or cancelled.
466    #[inline]
467    pub fn is_closed(&self) -> bool {
468        self.handle.is_none() || self.cancellation_token.is_cancelled()
469    }
470
471    /// Wait for the service to complete.
472    ///
473    /// This will block until the service loop terminates (either due to
474    /// cancellation, transport closure, or an error).
475    #[inline]
476    pub async fn waiting(mut self) -> Result<QuitReason, tokio::task::JoinError> {
477        match self.handle.take() {
478            Some(handle) => handle.await,
479            None => Ok(QuitReason::Closed),
480        }
481    }
482
483    /// Gracefully close the connection and wait for cleanup to complete.
484    ///
485    /// This method cancels the service, waits for the background task to finish
486    /// (which includes calling `transport.close()`), and ensures all cleanup
487    /// operations complete before returning.
488    ///
489    /// Unlike [`cancel`](Self::cancel), this method takes `&mut self` and can be
490    /// called without consuming the `RunningService`. After calling this method,
491    /// the service is considered closed and subsequent operations will fail.
492    ///
493    /// # Example
494    ///
495    /// ```rust,ignore
496    /// let mut client = ().serve(transport).await?;
497    /// // ... use the client ...
498    /// client.close().await?;
499    /// ```
500    pub async fn close(&mut self) -> Result<QuitReason, tokio::task::JoinError> {
501        if let Some(handle) = self.handle.take() {
502            // Disarm the drop guard so it doesn't try to cancel again
503            // We need to cancel manually and wait for completion
504            self.cancellation_token.cancel();
505            handle.await
506        } else {
507            // Already closed
508            Ok(QuitReason::Closed)
509        }
510    }
511
512    /// Gracefully close the connection with a timeout.
513    ///
514    /// Similar to [`close`](Self::close), but returns after the specified timeout
515    /// if the cleanup doesn't complete in time. This is useful for ensuring
516    /// a bounded shutdown time.
517    ///
518    /// Returns `Ok(Some(reason))` if shutdown completed within the timeout,
519    /// `Ok(None)` if the timeout was reached, or `Err` if there was a join error.
520    pub async fn close_with_timeout(
521        &mut self,
522        timeout: Duration,
523    ) -> Result<Option<QuitReason>, tokio::task::JoinError> {
524        if let Some(handle) = self.handle.take() {
525            self.cancellation_token.cancel();
526            match tokio::time::timeout(timeout, handle).await {
527                Ok(result) => result.map(Some),
528                Err(_elapsed) => {
529                    tracing::warn!(
530                        "close_with_timeout: cleanup did not complete within {:?}",
531                        timeout
532                    );
533                    Ok(None)
534                }
535            }
536        } else {
537            Ok(Some(QuitReason::Closed))
538        }
539    }
540
541    /// Cancel the service and wait for cleanup to complete.
542    ///
543    /// This consumes the `RunningService` and ensures the connection is properly
544    /// closed. For a non-consuming alternative, see [`close`](Self::close).
545    pub async fn cancel(mut self) -> Result<QuitReason, tokio::task::JoinError> {
546        // Disarm the drop guard since we're handling cancellation explicitly
547        let _ = std::mem::replace(&mut self.dg, self.cancellation_token.clone().drop_guard());
548        self.close().await
549    }
550}
551
552impl<R: ServiceRole, S: Service<R>> Drop for RunningService<R, S> {
553    fn drop(&mut self) {
554        if self.handle.is_some() && !self.cancellation_token.is_cancelled() {
555            tracing::debug!(
556                "RunningService dropped without explicit close(). \
557                 The connection will be closed asynchronously. \
558                 For guaranteed cleanup, call close() or cancel() before dropping."
559            );
560        }
561        // The DropGuard will handle cancellation
562    }
563}
564
565// use a wrapper type so we can tweak the implementation if needed
566pub struct RunningServiceCancellationToken(CancellationToken);
567
568impl RunningServiceCancellationToken {
569    pub fn cancel(self) {
570        self.0.cancel();
571    }
572}
573
574#[derive(Debug)]
575pub enum QuitReason {
576    Cancelled,
577    Closed,
578    JoinError(tokio::task::JoinError),
579}
580
581/// Request execution context
582#[derive(Debug, Clone)]
583pub struct RequestContext<R: ServiceRole> {
584    /// this token will be cancelled when the [`CancelledNotification`] is received.
585    pub ct: CancellationToken,
586    pub id: RequestId,
587    pub meta: Meta,
588    pub extensions: Extensions,
589    /// An interface to fetch the remote client or server
590    pub peer: Peer<R>,
591}
592
593/// Request execution context
594#[derive(Debug, Clone)]
595pub struct NotificationContext<R: ServiceRole> {
596    pub meta: Meta,
597    pub extensions: Extensions,
598    /// An interface to fetch the remote client or server
599    pub peer: Peer<R>,
600}
601
602/// Use this function to skip initialization process
603pub fn serve_directly<R, S, T, E, A>(
604    service: S,
605    transport: T,
606    peer_info: Option<R::PeerInfo>,
607) -> RunningService<R, S>
608where
609    R: ServiceRole,
610    S: Service<R>,
611    T: IntoTransport<R, E, A>,
612    E: std::error::Error + Send + Sync + 'static,
613{
614    serve_directly_with_ct(service, transport, peer_info, Default::default())
615}
616
617/// Use this function to skip initialization process
618pub fn serve_directly_with_ct<R, S, T, E, A>(
619    service: S,
620    transport: T,
621    peer_info: Option<R::PeerInfo>,
622    ct: CancellationToken,
623) -> RunningService<R, S>
624where
625    R: ServiceRole,
626    S: Service<R>,
627    T: IntoTransport<R, E, A>,
628    E: std::error::Error + Send + Sync + 'static,
629{
630    let (peer, peer_rx) = Peer::new(Arc::new(AtomicU32RequestIdProvider::default()), peer_info);
631    serve_inner(service, transport.into_transport(), peer, peer_rx, ct)
632}
633
634#[instrument(skip_all)]
635fn serve_inner<R, S, T>(
636    service: S,
637    transport: T,
638    peer: Peer<R>,
639    mut peer_rx: tokio::sync::mpsc::Receiver<PeerSinkMessage<R>>,
640    ct: CancellationToken,
641) -> RunningService<R, S>
642where
643    R: ServiceRole,
644    S: Service<R>,
645    T: Transport<R> + 'static,
646{
647    const SINK_PROXY_BUFFER_SIZE: usize = 64;
648    let (sink_proxy_tx, mut sink_proxy_rx) =
649        tokio::sync::mpsc::channel::<TxJsonRpcMessage<R>>(SINK_PROXY_BUFFER_SIZE);
650    let peer_info = peer.peer_info();
651    if R::IS_CLIENT {
652        tracing::info!(?peer_info, "Service initialized as client");
653    } else {
654        tracing::info!(?peer_info, "Service initialized as server");
655    }
656
657    let mut local_responder_pool =
658        HashMap::<RequestId, Responder<Result<R::PeerResp, ServiceError>>>::new();
659    let mut local_ct_pool = HashMap::<RequestId, CancellationToken>::new();
660    let shared_service = Arc::new(service);
661    // for return
662    let service = shared_service.clone();
663
664    // let message_sink = tokio::sync::
665    // let mut stream = std::pin::pin!(stream);
666    let serve_loop_ct = ct.child_token();
667    let peer_return: Peer<R> = peer.clone();
668    let current_span = tracing::Span::current();
669    let handle = tokio::spawn(async move {
670        let mut transport = transport.into_transport();
671        let mut batch_messages = VecDeque::<RxJsonRpcMessage<R>>::new();
672        let mut send_task_set = tokio::task::JoinSet::<SendTaskResult>::new();
673        #[derive(Debug)]
674        enum SendTaskResult {
675            Request {
676                id: RequestId,
677                result: Result<(), DynamicTransportError>,
678            },
679            Notification {
680                responder: Responder<Result<(), ServiceError>>,
681                cancellation_param: Option<CancelledNotificationParam>,
682                result: Result<(), DynamicTransportError>,
683            },
684        }
685        #[derive(Debug)]
686        enum Event<R: ServiceRole> {
687            ProxyMessage(PeerSinkMessage<R>),
688            PeerMessage(RxJsonRpcMessage<R>),
689            ToSink(TxJsonRpcMessage<R>),
690            SendTaskResult(SendTaskResult),
691        }
692
693        let quit_reason = loop {
694            let evt = if let Some(m) = batch_messages.pop_front() {
695                Event::PeerMessage(m)
696            } else {
697                tokio::select! {
698                    m = sink_proxy_rx.recv() => {
699                        if let Some(m) = m {
700                            Event::ToSink(m)
701                        } else {
702                            continue
703                        }
704                    }
705                    m = transport.receive() => {
706                        if let Some(m) = m {
707                            Event::PeerMessage(m)
708                        } else {
709                            // [PATCH: ra0x3/mcpkit-rs] Drain pending messages on EOF to fix tokio 1.36 race condition
710                            // Input stream closed - but don't break immediately. Continue processing pending messages from sink_proxy_rx
711                            tracing::info!("input stream terminated, draining pending messages");
712
713                            // Give spawned tasks time to send their messages
714                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
715
716                            // Process any pending messages in sink_proxy_rx
717                            while let Ok(m) = sink_proxy_rx.try_recv() {
718                                tracing::debug!("Processing pending message after input close");
719                                if let Err(e) = transport.send(m).await {
720                                    tracing::error!("Failed to send pending message: {:?}", e);
721                                }
722                            }
723
724                            break QuitReason::Closed
725                        }
726                    }
727                    m = peer_rx.recv() => {
728                        if let Some(m) = m {
729                            Event::ProxyMessage(m)
730                        } else {
731                            continue
732                        }
733                    }
734                    m = send_task_set.join_next(), if !send_task_set.is_empty() => {
735                        let Some(result) = m else {
736                            continue
737                        };
738                        match result {
739                            Err(e) => {
740                                // join error, which is serious, we should quit.
741                                tracing::error!(%e, "send request task encounter a tokio join error");
742                                break QuitReason::JoinError(e)
743                            }
744                            Ok(result) => {
745                                Event::SendTaskResult(result)
746                            }
747                        }
748                    }
749                    _ = serve_loop_ct.cancelled() => {
750                        tracing::info!("task cancelled");
751                        break QuitReason::Cancelled
752                    }
753                }
754            };
755
756            tracing::trace!(?evt, "new event");
757            match evt {
758                Event::SendTaskResult(SendTaskResult::Request { id, result }) => {
759                    if let Err(e) = result {
760                        if let Some(responder) = local_responder_pool.remove(&id) {
761                            let _ = responder.send(Err(ServiceError::TransportSend(e)));
762                        }
763                    }
764                }
765                Event::SendTaskResult(SendTaskResult::Notification {
766                    responder,
767                    result,
768                    cancellation_param,
769                }) => {
770                    let response = if let Err(e) = result {
771                        Err(ServiceError::TransportSend(e))
772                    } else {
773                        Ok(())
774                    };
775                    let _ = responder.send(response);
776                    if let Some(param) = cancellation_param {
777                        if let Some(responder) = local_responder_pool.remove(&param.request_id) {
778                            tracing::info!(id = %param.request_id, reason = param.reason, "cancelled");
779                            let _response_result = responder.send(Err(ServiceError::Cancelled {
780                                reason: param.reason.clone(),
781                            }));
782                        }
783                    }
784                }
785                // response and error
786                Event::ToSink(m) => {
787                    if let Some(id) = match &m {
788                        JsonRpcMessage::Response(response) => Some(&response.id),
789                        JsonRpcMessage::Error(error) => Some(&error.id),
790                        _ => None,
791                    } {
792                        if let Some(ct) = local_ct_pool.remove(id) {
793                            ct.cancel();
794                        }
795                        // [PATCH: ra0x3/mcpkit-rs] Send responses inline to fix tokio 1.36 race condition
796                        // Send directly to transport instead of spawning. This ensures the response is sent immediately.
797                        let send_result = transport.send(m).await;
798                        if let Err(error) = send_result {
799                            tracing::error!(%error, "fail to response message");
800                        } else {
801                            tracing::info!("Successfully sent response to transport");
802                        }
803                    }
804                }
805                Event::ProxyMessage(PeerSinkMessage::Request {
806                    request,
807                    id,
808                    responder,
809                }) => {
810                    local_responder_pool.insert(id.clone(), responder);
811                    let send = transport.send(JsonRpcMessage::request(request, id.clone()));
812                    {
813                        let id = id.clone();
814                        let current_span = tracing::Span::current();
815                        send_task_set.spawn(send.map(move |r| SendTaskResult::Request {
816                            id,
817                            result: r.map_err(DynamicTransportError::new::<T, R>),
818                        }).instrument(current_span));
819                    }
820                }
821                Event::ProxyMessage(PeerSinkMessage::Notification {
822                    notification,
823                    responder,
824                }) => {
825                    // catch cancellation notification
826                    let mut cancellation_param = None;
827                    let notification = match notification.try_into() {
828                        Ok::<CancelledNotification, _>(cancelled) => {
829                            cancellation_param.replace(cancelled.params.clone());
830                            cancelled.into()
831                        }
832                        Err(notification) => notification,
833                    };
834                    let send = transport.send(JsonRpcMessage::notification(notification));
835                    let current_span = tracing::Span::current();
836                    send_task_set.spawn(send.map(move |result| SendTaskResult::Notification {
837                        responder,
838                        cancellation_param,
839                        result: result.map_err(DynamicTransportError::new::<T, R>),
840                    }).instrument(current_span));
841                }
842                Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
843                    id,
844                    mut request,
845                    ..
846                })) => {
847                    tracing::debug!(%id, ?request, "received request");
848                    {
849                        let service = shared_service.clone();
850                        let sink = sink_proxy_tx.clone();
851                        let request_ct = serve_loop_ct.child_token();
852                        let context_ct = request_ct.child_token();
853                        local_ct_pool.insert(id.clone(), request_ct);
854                        let mut extensions = Extensions::new();
855                        let mut meta = Meta::new();
856                        // avoid clone
857                        // swap meta firstly, otherwise progress token will be lost
858                        std::mem::swap(&mut meta, request.get_meta_mut());
859                        std::mem::swap(&mut extensions, request.extensions_mut());
860                        let context = RequestContext {
861                            ct: context_ct,
862                            id: id.clone(),
863                            peer: peer.clone(),
864                            meta,
865                            extensions,
866                        };
867                        // [PATCH: ra0x3/mcpkit-rs] Handle requests inline to fix tokio 1.36 race condition
868                        // Handle requests inline instead of spawning. This ensures responses are sent before the
869                        // event loop potentially blocks waiting for more input. In tokio 1.46+, spawning works fine,
870                        // but in 1.36 the spawned task might not run if the event loop is blocked on input.
871                        let result = service.handle_request(request, context).await;
872                        let response = match result {
873                            Ok(result) => {
874                                tracing::debug!(%id, ?result, "response message");
875                                JsonRpcMessage::response(result, id.clone())
876                            }
877                            Err(error) => {
878                                tracing::warn!(%id, ?error, "response error");
879                                JsonRpcMessage::error(error, id.clone())
880                            }
881                        };
882                        let _send_result = sink.send(response).await;
883                        tracing::info!("Sent response to channel for id={}", id);
884                    }
885                }
886                Event::PeerMessage(JsonRpcMessage::Notification(JsonRpcNotification {
887                    notification,
888                    ..
889                })) => {
890                    tracing::info!(?notification, "received notification");
891                    // catch cancelled notification
892                    let mut notification = match notification.try_into() {
893                        Ok::<CancelledNotification, _>(cancelled) => {
894                            if let Some(ct) = local_ct_pool.remove(&cancelled.params.request_id) {
895                                tracing::info!(id = %cancelled.params.request_id, reason = cancelled.params.reason, "cancelled");
896                                ct.cancel();
897                            }
898                            cancelled.into()
899                        }
900                        Err(notification) => notification,
901                    };
902                    {
903                        let service = shared_service.clone();
904                        let mut extensions = Extensions::new();
905                        let mut meta = Meta::new();
906                        // avoid clone
907                        std::mem::swap(&mut extensions, notification.extensions_mut());
908                        std::mem::swap(&mut meta, notification.get_meta_mut());
909                        let context = NotificationContext {
910                            peer: peer.clone(),
911                            meta,
912                            extensions,
913                        };
914                        // [PATCH: ra0x3/mcpkit-rs] Handle notifications inline to fix tokio 1.36 race condition
915                        // Handle notifications inline instead of spawning to ensure they are processed immediately.
916                        let result = service.handle_notification(notification, context).await;
917                        if let Err(error) = result {
918                            tracing::warn!(%error, "Error sending notification");
919                        }
920                    }
921                }
922                Event::PeerMessage(JsonRpcMessage::Response(JsonRpcResponse {
923                    result,
924                    id,
925                    ..
926                })) => {
927                    if let Some(responder) = local_responder_pool.remove(&id) {
928                        let response_result = responder.send(Ok(result));
929                        if let Err(_error) = response_result {
930                            tracing::warn!(%id, "Error sending response");
931                        }
932                    }
933                }
934                Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
935                    if let Some(responder) = local_responder_pool.remove(&id) {
936                        let _response_result = responder.send(Err(ServiceError::McpError(error)));
937                        if let Err(_error) = _response_result {
938                            tracing::warn!(%id, "Error sending response");
939                        }
940                    }
941                }
942            }
943        };
944
945        // Give any final transport sends time to complete
946        tokio::task::yield_now().await;
947
948        let sink_close_result = transport.close().await;
949        if let Err(e) = sink_close_result {
950            tracing::error!(%e, "fail to close sink");
951        }
952        tracing::info!(?quit_reason, "serve finished");
953        quit_reason
954    }.instrument(current_span));
955    RunningService {
956        service,
957        peer: peer_return,
958        handle: Some(handle),
959        cancellation_token: ct.clone(),
960        dg: ct.drop_guard(),
961    }
962}