1use futures::{FutureExt, future::BoxFuture};
2use thiserror::Error;
3
4#[cfg(feature = "server")]
5use crate::model::ServerJsonRpcMessage;
6use crate::{
7 error::ErrorData as McpError,
8 model::{
9 CancelledNotification, CancelledNotificationParam, Extensions, GetExtensions, GetMeta,
10 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Meta,
11 NumberOrString, ProgressToken, RequestId,
12 },
13 transport::{DynamicTransportError, IntoTransport, Transport},
14};
15#[cfg(feature = "client")]
16#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
17mod client;
18#[cfg(feature = "client")]
19#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
20pub use client::*;
21#[cfg(feature = "server")]
22#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
23mod server;
24#[cfg(feature = "server")]
25#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
26pub use server::*;
27#[cfg(feature = "tower")]
28#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
29mod tower;
30use tokio_util::sync::{CancellationToken, DropGuard};
31#[cfg(feature = "tower")]
32#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
33pub use tower::*;
34use tracing::{Instrument as _, instrument};
35#[derive(Error, Debug)]
36#[non_exhaustive]
37pub enum ServiceError {
38 #[error("Mcp error: {0}")]
39 McpError(McpError),
40 #[error("Transport send error: {0}")]
41 TransportSend(DynamicTransportError),
42 #[error("Transport closed")]
43 TransportClosed,
44 #[error("Unexpected response type")]
45 UnexpectedResponse,
46 #[error("task cancelled for reason {}", reason.as_deref().unwrap_or("<unknown>"))]
47 Cancelled { reason: Option<String> },
48 #[error("request timeout after {}", chrono::Duration::from_std(*timeout).unwrap_or_default())]
49 Timeout { timeout: Duration },
50}
51
52trait TransferObject:
53 std::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
54{
55}
56
57impl<T> TransferObject for T where
58 T: std::fmt::Debug
59 + serde::Serialize
60 + serde::de::DeserializeOwned
61 + Send
62 + Sync
63 + 'static
64 + Clone
65{
66}
67
68#[allow(private_bounds, reason = "there's no the third implementation")]
69pub trait ServiceRole: std::fmt::Debug + Send + Sync + 'static + Copy + Clone {
70 type Req: TransferObject + GetMeta + GetExtensions;
71 type Resp: TransferObject;
72 type Not: TryInto<CancelledNotification, Error = Self::Not>
73 + From<CancelledNotification>
74 + TransferObject;
75 type PeerReq: TransferObject + GetMeta + GetExtensions;
76 type PeerResp: TransferObject;
77 type PeerNot: TryInto<CancelledNotification, Error = Self::PeerNot>
78 + From<CancelledNotification>
79 + TransferObject
80 + GetMeta
81 + GetExtensions;
82 type InitializeError;
83 const IS_CLIENT: bool;
84 type Info: TransferObject;
85 type PeerInfo: TransferObject;
86}
87
88pub type TxJsonRpcMessage<R> =
89 JsonRpcMessage<<R as ServiceRole>::Req, <R as ServiceRole>::Resp, <R as ServiceRole>::Not>;
90pub type RxJsonRpcMessage<R> = JsonRpcMessage<
91 <R as ServiceRole>::PeerReq,
92 <R as ServiceRole>::PeerResp,
93 <R as ServiceRole>::PeerNot,
94>;
95
96pub trait Service<R: ServiceRole>: Send + Sync + 'static {
97 fn handle_request(
98 &self,
99 request: R::PeerReq,
100 context: RequestContext<R>,
101 ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_;
102 fn handle_notification(
103 &self,
104 notification: R::PeerNot,
105 context: NotificationContext<R>,
106 ) -> impl Future<Output = Result<(), McpError>> + Send + '_;
107 fn get_info(&self) -> R::Info;
108}
109
110pub trait ServiceExt<R: ServiceRole>: Service<R> + Sized {
111 fn into_dyn(self) -> Box<dyn DynService<R>> {
115 Box::new(self)
116 }
117 fn serve<T, E, A>(
118 self,
119 transport: T,
120 ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
121 where
122 T: IntoTransport<R, E, A>,
123 E: std::error::Error + Send + Sync + 'static,
124 Self: Sized,
125 {
126 Self::serve_with_ct(self, transport, Default::default())
127 }
128 fn serve_with_ct<T, E, A>(
129 self,
130 transport: T,
131 ct: CancellationToken,
132 ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
133 where
134 T: IntoTransport<R, E, A>,
135 E: std::error::Error + Send + Sync + 'static,
136 Self: Sized;
137}
138
139impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
140 fn handle_request(
141 &self,
142 request: R::PeerReq,
143 context: RequestContext<R>,
144 ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_ {
145 DynService::handle_request(self.as_ref(), request, context)
146 }
147
148 fn handle_notification(
149 &self,
150 notification: R::PeerNot,
151 context: NotificationContext<R>,
152 ) -> impl Future<Output = Result<(), McpError>> + Send + '_ {
153 DynService::handle_notification(self.as_ref(), notification, context)
154 }
155
156 fn get_info(&self) -> R::Info {
157 DynService::get_info(self.as_ref())
158 }
159}
160
161pub trait DynService<R: ServiceRole>: Send + Sync {
162 fn handle_request(
163 &self,
164 request: R::PeerReq,
165 context: RequestContext<R>,
166 ) -> BoxFuture<'_, Result<R::Resp, McpError>>;
167 fn handle_notification(
168 &self,
169 notification: R::PeerNot,
170 context: NotificationContext<R>,
171 ) -> BoxFuture<'_, Result<(), McpError>>;
172 fn get_info(&self) -> R::Info;
173}
174
175impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
176 fn handle_request(
177 &self,
178 request: R::PeerReq,
179 context: RequestContext<R>,
180 ) -> BoxFuture<'_, Result<R::Resp, McpError>> {
181 Box::pin(self.handle_request(request, context))
182 }
183 fn handle_notification(
184 &self,
185 notification: R::PeerNot,
186 context: NotificationContext<R>,
187 ) -> BoxFuture<'_, Result<(), McpError>> {
188 Box::pin(self.handle_notification(notification, context))
189 }
190 fn get_info(&self) -> R::Info {
191 self.get_info()
192 }
193}
194
195use std::{
196 collections::{HashMap, VecDeque},
197 ops::Deref,
198 sync::{Arc, atomic::AtomicU64},
199 time::Duration,
200};
201
202use tokio::sync::mpsc;
203
204pub trait RequestIdProvider: Send + Sync + 'static {
205 fn next_request_id(&self) -> RequestId;
206}
207
208pub trait ProgressTokenProvider: Send + Sync + 'static {
209 fn next_progress_token(&self) -> ProgressToken;
210}
211
212pub type AtomicU32RequestIdProvider = AtomicU32Provider;
213pub type AtomicU32ProgressTokenProvider = AtomicU32Provider;
214
215#[derive(Debug, Default)]
216pub struct AtomicU32Provider {
217 id: AtomicU64,
218}
219
220impl RequestIdProvider for AtomicU32Provider {
221 fn next_request_id(&self) -> RequestId {
222 let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
223 RequestId::Number(id as i64)
225 }
226}
227
228impl ProgressTokenProvider for AtomicU32Provider {
229 fn next_progress_token(&self) -> ProgressToken {
230 let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
231 ProgressToken(NumberOrString::Number(id as i64))
232 }
233}
234
235type Responder<T> = tokio::sync::oneshot::Sender<T>;
236
237#[derive(Debug)]
243pub struct RequestHandle<R: ServiceRole> {
244 pub rx: tokio::sync::oneshot::Receiver<Result<R::PeerResp, ServiceError>>,
245 pub options: PeerRequestOptions,
246 pub peer: Peer<R>,
247 pub id: RequestId,
248 pub progress_token: ProgressToken,
249}
250
251impl<R: ServiceRole> RequestHandle<R> {
252 pub const REQUEST_TIMEOUT_REASON: &str = "request timeout";
253 pub async fn await_response(self) -> Result<R::PeerResp, ServiceError> {
254 if let Some(timeout) = self.options.timeout {
255 let timeout_result = tokio::time::timeout(timeout, async move {
256 self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
257 })
258 .await;
259 match timeout_result {
260 Ok(response) => response,
261 Err(_) => {
262 let error = Err(ServiceError::Timeout { timeout });
263 let notification = CancelledNotification {
265 params: CancelledNotificationParam {
266 request_id: self.id,
267 reason: Some(Self::REQUEST_TIMEOUT_REASON.to_owned()),
268 },
269 method: crate::model::CancelledNotificationMethod,
270 extensions: Default::default(),
271 };
272 let _ = self.peer.send_notification(notification.into()).await;
273 error
274 }
275 }
276 } else {
277 self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
278 }
279 }
280
281 pub async fn cancel(self, reason: Option<String>) -> Result<(), ServiceError> {
283 let notification = CancelledNotification {
284 params: CancelledNotificationParam {
285 request_id: self.id,
286 reason,
287 },
288 method: crate::model::CancelledNotificationMethod,
289 extensions: Default::default(),
290 };
291 self.peer.send_notification(notification.into()).await?;
292 Ok(())
293 }
294}
295
296#[derive(Debug)]
297pub(crate) enum PeerSinkMessage<R: ServiceRole> {
298 Request {
299 request: R::Req,
300 id: RequestId,
301 responder: Responder<Result<R::PeerResp, ServiceError>>,
302 },
303 Notification {
304 notification: R::Not,
305 responder: Responder<Result<(), ServiceError>>,
306 },
307}
308
309#[derive(Clone)]
315pub struct Peer<R: ServiceRole> {
316 tx: mpsc::Sender<PeerSinkMessage<R>>,
317 request_id_provider: Arc<dyn RequestIdProvider>,
318 progress_token_provider: Arc<dyn ProgressTokenProvider>,
319 info: Arc<tokio::sync::OnceCell<R::PeerInfo>>,
320}
321
322impl<R: ServiceRole> std::fmt::Debug for Peer<R> {
323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324 f.debug_struct("PeerSink")
325 .field("tx", &self.tx)
326 .field("is_client", &R::IS_CLIENT)
327 .finish()
328 }
329}
330
331type ProxyOutbound<R> = mpsc::Receiver<PeerSinkMessage<R>>;
332
333#[derive(Debug, Default)]
334pub struct PeerRequestOptions {
335 pub timeout: Option<Duration>,
336 pub meta: Option<Meta>,
337}
338
339impl PeerRequestOptions {
340 pub fn no_options() -> Self {
341 Self::default()
342 }
343}
344
345impl<R: ServiceRole> Peer<R> {
346 const CLIENT_CHANNEL_BUFFER_SIZE: usize = 1024;
347 pub(crate) fn new(
348 request_id_provider: Arc<dyn RequestIdProvider>,
349 peer_info: Option<R::PeerInfo>,
350 ) -> (Peer<R>, ProxyOutbound<R>) {
351 let (tx, rx) = mpsc::channel(Self::CLIENT_CHANNEL_BUFFER_SIZE);
352 (
353 Self {
354 tx,
355 request_id_provider,
356 progress_token_provider: Arc::new(AtomicU32ProgressTokenProvider::default()),
357 info: Arc::new(tokio::sync::OnceCell::new_with(peer_info)),
358 },
359 rx,
360 )
361 }
362 pub async fn send_notification(&self, notification: R::Not) -> Result<(), ServiceError> {
363 let (responder, receiver) = tokio::sync::oneshot::channel();
364 self.tx
365 .send(PeerSinkMessage::Notification {
366 notification,
367 responder,
368 })
369 .await
370 .map_err(|_m| ServiceError::TransportClosed)?;
371 receiver.await.map_err(|_e| ServiceError::TransportClosed)?
372 }
373 pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
374 self.send_request_with_option(request, PeerRequestOptions::no_options())
375 .await?
376 .await_response()
377 .await
378 }
379
380 pub async fn send_cancellable_request(
381 &self,
382 request: R::Req,
383 options: PeerRequestOptions,
384 ) -> Result<RequestHandle<R>, ServiceError> {
385 self.send_request_with_option(request, options).await
386 }
387
388 pub async fn send_request_with_option(
389 &self,
390 mut request: R::Req,
391 options: PeerRequestOptions,
392 ) -> Result<RequestHandle<R>, ServiceError> {
393 let id = self.request_id_provider.next_request_id();
394 let progress_token = self.progress_token_provider.next_progress_token();
395 request
396 .get_meta_mut()
397 .set_progress_token(progress_token.clone());
398 if let Some(meta) = options.meta.clone() {
399 request.get_meta_mut().extend(meta);
400 }
401 let (responder, receiver) = tokio::sync::oneshot::channel();
402 self.tx
403 .send(PeerSinkMessage::Request {
404 request,
405 id: id.clone(),
406 responder,
407 })
408 .await
409 .map_err(|_m| ServiceError::TransportClosed)?;
410 Ok(RequestHandle {
411 id,
412 rx: receiver,
413 progress_token,
414 options,
415 peer: self.clone(),
416 })
417 }
418 pub fn peer_info(&self) -> Option<&R::PeerInfo> {
419 self.info.get()
420 }
421
422 pub fn set_peer_info(&self, info: R::PeerInfo) {
423 if self.info.initialized() {
424 tracing::warn!("trying to set peer info, which is already initialized");
425 } else {
426 let _ = self.info.set(info);
427 }
428 }
429
430 pub fn is_transport_closed(&self) -> bool {
431 self.tx.is_closed()
432 }
433}
434
435#[derive(Debug)]
436pub struct RunningService<R: ServiceRole, S: Service<R>> {
437 service: Arc<S>,
438 peer: Peer<R>,
439 handle: Option<tokio::task::JoinHandle<QuitReason>>,
440 cancellation_token: CancellationToken,
441 dg: DropGuard,
442}
443impl<R: ServiceRole, S: Service<R>> Deref for RunningService<R, S> {
444 type Target = Peer<R>;
445
446 fn deref(&self) -> &Self::Target {
447 &self.peer
448 }
449}
450
451impl<R: ServiceRole, S: Service<R>> RunningService<R, S> {
452 #[inline]
453 pub fn peer(&self) -> &Peer<R> {
454 &self.peer
455 }
456 #[inline]
457 pub fn service(&self) -> &S {
458 self.service.as_ref()
459 }
460 #[inline]
461 pub fn cancellation_token(&self) -> RunningServiceCancellationToken {
462 RunningServiceCancellationToken(self.cancellation_token.clone())
463 }
464
465 #[inline]
467 pub fn is_closed(&self) -> bool {
468 self.handle.is_none() || self.cancellation_token.is_cancelled()
469 }
470
471 #[inline]
476 pub async fn waiting(mut self) -> Result<QuitReason, tokio::task::JoinError> {
477 match self.handle.take() {
478 Some(handle) => handle.await,
479 None => Ok(QuitReason::Closed),
480 }
481 }
482
483 pub async fn close(&mut self) -> Result<QuitReason, tokio::task::JoinError> {
501 if let Some(handle) = self.handle.take() {
502 self.cancellation_token.cancel();
505 handle.await
506 } else {
507 Ok(QuitReason::Closed)
509 }
510 }
511
512 pub async fn close_with_timeout(
521 &mut self,
522 timeout: Duration,
523 ) -> Result<Option<QuitReason>, tokio::task::JoinError> {
524 if let Some(handle) = self.handle.take() {
525 self.cancellation_token.cancel();
526 match tokio::time::timeout(timeout, handle).await {
527 Ok(result) => result.map(Some),
528 Err(_elapsed) => {
529 tracing::warn!(
530 "close_with_timeout: cleanup did not complete within {:?}",
531 timeout
532 );
533 Ok(None)
534 }
535 }
536 } else {
537 Ok(Some(QuitReason::Closed))
538 }
539 }
540
541 pub async fn cancel(mut self) -> Result<QuitReason, tokio::task::JoinError> {
546 let _ = std::mem::replace(&mut self.dg, self.cancellation_token.clone().drop_guard());
548 self.close().await
549 }
550}
551
552impl<R: ServiceRole, S: Service<R>> Drop for RunningService<R, S> {
553 fn drop(&mut self) {
554 if self.handle.is_some() && !self.cancellation_token.is_cancelled() {
555 tracing::debug!(
556 "RunningService dropped without explicit close(). \
557 The connection will be closed asynchronously. \
558 For guaranteed cleanup, call close() or cancel() before dropping."
559 );
560 }
561 }
563}
564
565pub struct RunningServiceCancellationToken(CancellationToken);
567
568impl RunningServiceCancellationToken {
569 pub fn cancel(self) {
570 self.0.cancel();
571 }
572}
573
574#[derive(Debug)]
575pub enum QuitReason {
576 Cancelled,
577 Closed,
578 JoinError(tokio::task::JoinError),
579}
580
581#[derive(Debug, Clone)]
583pub struct RequestContext<R: ServiceRole> {
584 pub ct: CancellationToken,
586 pub id: RequestId,
587 pub meta: Meta,
588 pub extensions: Extensions,
589 pub peer: Peer<R>,
591}
592
593#[derive(Debug, Clone)]
595pub struct NotificationContext<R: ServiceRole> {
596 pub meta: Meta,
597 pub extensions: Extensions,
598 pub peer: Peer<R>,
600}
601
602pub fn serve_directly<R, S, T, E, A>(
604 service: S,
605 transport: T,
606 peer_info: Option<R::PeerInfo>,
607) -> RunningService<R, S>
608where
609 R: ServiceRole,
610 S: Service<R>,
611 T: IntoTransport<R, E, A>,
612 E: std::error::Error + Send + Sync + 'static,
613{
614 serve_directly_with_ct(service, transport, peer_info, Default::default())
615}
616
617pub fn serve_directly_with_ct<R, S, T, E, A>(
619 service: S,
620 transport: T,
621 peer_info: Option<R::PeerInfo>,
622 ct: CancellationToken,
623) -> RunningService<R, S>
624where
625 R: ServiceRole,
626 S: Service<R>,
627 T: IntoTransport<R, E, A>,
628 E: std::error::Error + Send + Sync + 'static,
629{
630 let (peer, peer_rx) = Peer::new(Arc::new(AtomicU32RequestIdProvider::default()), peer_info);
631 serve_inner(service, transport.into_transport(), peer, peer_rx, ct)
632}
633
634#[instrument(skip_all)]
635fn serve_inner<R, S, T>(
636 service: S,
637 transport: T,
638 peer: Peer<R>,
639 mut peer_rx: tokio::sync::mpsc::Receiver<PeerSinkMessage<R>>,
640 ct: CancellationToken,
641) -> RunningService<R, S>
642where
643 R: ServiceRole,
644 S: Service<R>,
645 T: Transport<R> + 'static,
646{
647 const SINK_PROXY_BUFFER_SIZE: usize = 64;
648 let (sink_proxy_tx, mut sink_proxy_rx) =
649 tokio::sync::mpsc::channel::<TxJsonRpcMessage<R>>(SINK_PROXY_BUFFER_SIZE);
650 let peer_info = peer.peer_info();
651 if R::IS_CLIENT {
652 tracing::info!(?peer_info, "Service initialized as client");
653 } else {
654 tracing::info!(?peer_info, "Service initialized as server");
655 }
656
657 let mut local_responder_pool =
658 HashMap::<RequestId, Responder<Result<R::PeerResp, ServiceError>>>::new();
659 let mut local_ct_pool = HashMap::<RequestId, CancellationToken>::new();
660 let shared_service = Arc::new(service);
661 let service = shared_service.clone();
663
664 let serve_loop_ct = ct.child_token();
667 let peer_return: Peer<R> = peer.clone();
668 let current_span = tracing::Span::current();
669 let handle = tokio::spawn(async move {
670 let mut transport = transport.into_transport();
671 let mut batch_messages = VecDeque::<RxJsonRpcMessage<R>>::new();
672 let mut send_task_set = tokio::task::JoinSet::<SendTaskResult>::new();
673 #[derive(Debug)]
674 enum SendTaskResult {
675 Request {
676 id: RequestId,
677 result: Result<(), DynamicTransportError>,
678 },
679 Notification {
680 responder: Responder<Result<(), ServiceError>>,
681 cancellation_param: Option<CancelledNotificationParam>,
682 result: Result<(), DynamicTransportError>,
683 },
684 }
685 #[derive(Debug)]
686 enum Event<R: ServiceRole> {
687 ProxyMessage(PeerSinkMessage<R>),
688 PeerMessage(RxJsonRpcMessage<R>),
689 ToSink(TxJsonRpcMessage<R>),
690 SendTaskResult(SendTaskResult),
691 }
692
693 let quit_reason = loop {
694 let evt = if let Some(m) = batch_messages.pop_front() {
695 Event::PeerMessage(m)
696 } else {
697 tokio::select! {
698 m = sink_proxy_rx.recv() => {
699 if let Some(m) = m {
700 Event::ToSink(m)
701 } else {
702 continue
703 }
704 }
705 m = transport.receive() => {
706 if let Some(m) = m {
707 Event::PeerMessage(m)
708 } else {
709 tracing::info!("input stream terminated, draining pending messages");
712
713 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
715
716 while let Ok(m) = sink_proxy_rx.try_recv() {
718 tracing::debug!("Processing pending message after input close");
719 if let Err(e) = transport.send(m).await {
720 tracing::error!("Failed to send pending message: {:?}", e);
721 }
722 }
723
724 break QuitReason::Closed
725 }
726 }
727 m = peer_rx.recv() => {
728 if let Some(m) = m {
729 Event::ProxyMessage(m)
730 } else {
731 continue
732 }
733 }
734 m = send_task_set.join_next(), if !send_task_set.is_empty() => {
735 let Some(result) = m else {
736 continue
737 };
738 match result {
739 Err(e) => {
740 tracing::error!(%e, "send request task encounter a tokio join error");
742 break QuitReason::JoinError(e)
743 }
744 Ok(result) => {
745 Event::SendTaskResult(result)
746 }
747 }
748 }
749 _ = serve_loop_ct.cancelled() => {
750 tracing::info!("task cancelled");
751 break QuitReason::Cancelled
752 }
753 }
754 };
755
756 tracing::trace!(?evt, "new event");
757 match evt {
758 Event::SendTaskResult(SendTaskResult::Request { id, result }) => {
759 if let Err(e) = result {
760 if let Some(responder) = local_responder_pool.remove(&id) {
761 let _ = responder.send(Err(ServiceError::TransportSend(e)));
762 }
763 }
764 }
765 Event::SendTaskResult(SendTaskResult::Notification {
766 responder,
767 result,
768 cancellation_param,
769 }) => {
770 let response = if let Err(e) = result {
771 Err(ServiceError::TransportSend(e))
772 } else {
773 Ok(())
774 };
775 let _ = responder.send(response);
776 if let Some(param) = cancellation_param {
777 if let Some(responder) = local_responder_pool.remove(¶m.request_id) {
778 tracing::info!(id = %param.request_id, reason = param.reason, "cancelled");
779 let _response_result = responder.send(Err(ServiceError::Cancelled {
780 reason: param.reason.clone(),
781 }));
782 }
783 }
784 }
785 Event::ToSink(m) => {
787 if let Some(id) = match &m {
788 JsonRpcMessage::Response(response) => Some(&response.id),
789 JsonRpcMessage::Error(error) => Some(&error.id),
790 _ => None,
791 } {
792 if let Some(ct) = local_ct_pool.remove(id) {
793 ct.cancel();
794 }
795 let send_result = transport.send(m).await;
798 if let Err(error) = send_result {
799 tracing::error!(%error, "fail to response message");
800 } else {
801 tracing::info!("Successfully sent response to transport");
802 }
803 }
804 }
805 Event::ProxyMessage(PeerSinkMessage::Request {
806 request,
807 id,
808 responder,
809 }) => {
810 local_responder_pool.insert(id.clone(), responder);
811 let send = transport.send(JsonRpcMessage::request(request, id.clone()));
812 {
813 let id = id.clone();
814 let current_span = tracing::Span::current();
815 send_task_set.spawn(send.map(move |r| SendTaskResult::Request {
816 id,
817 result: r.map_err(DynamicTransportError::new::<T, R>),
818 }).instrument(current_span));
819 }
820 }
821 Event::ProxyMessage(PeerSinkMessage::Notification {
822 notification,
823 responder,
824 }) => {
825 let mut cancellation_param = None;
827 let notification = match notification.try_into() {
828 Ok::<CancelledNotification, _>(cancelled) => {
829 cancellation_param.replace(cancelled.params.clone());
830 cancelled.into()
831 }
832 Err(notification) => notification,
833 };
834 let send = transport.send(JsonRpcMessage::notification(notification));
835 let current_span = tracing::Span::current();
836 send_task_set.spawn(send.map(move |result| SendTaskResult::Notification {
837 responder,
838 cancellation_param,
839 result: result.map_err(DynamicTransportError::new::<T, R>),
840 }).instrument(current_span));
841 }
842 Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
843 id,
844 mut request,
845 ..
846 })) => {
847 tracing::debug!(%id, ?request, "received request");
848 {
849 let service = shared_service.clone();
850 let sink = sink_proxy_tx.clone();
851 let request_ct = serve_loop_ct.child_token();
852 let context_ct = request_ct.child_token();
853 local_ct_pool.insert(id.clone(), request_ct);
854 let mut extensions = Extensions::new();
855 let mut meta = Meta::new();
856 std::mem::swap(&mut meta, request.get_meta_mut());
859 std::mem::swap(&mut extensions, request.extensions_mut());
860 let context = RequestContext {
861 ct: context_ct,
862 id: id.clone(),
863 peer: peer.clone(),
864 meta,
865 extensions,
866 };
867 let result = service.handle_request(request, context).await;
872 let response = match result {
873 Ok(result) => {
874 tracing::debug!(%id, ?result, "response message");
875 JsonRpcMessage::response(result, id.clone())
876 }
877 Err(error) => {
878 tracing::warn!(%id, ?error, "response error");
879 JsonRpcMessage::error(error, id.clone())
880 }
881 };
882 let _send_result = sink.send(response).await;
883 tracing::info!("Sent response to channel for id={}", id);
884 }
885 }
886 Event::PeerMessage(JsonRpcMessage::Notification(JsonRpcNotification {
887 notification,
888 ..
889 })) => {
890 tracing::info!(?notification, "received notification");
891 let mut notification = match notification.try_into() {
893 Ok::<CancelledNotification, _>(cancelled) => {
894 if let Some(ct) = local_ct_pool.remove(&cancelled.params.request_id) {
895 tracing::info!(id = %cancelled.params.request_id, reason = cancelled.params.reason, "cancelled");
896 ct.cancel();
897 }
898 cancelled.into()
899 }
900 Err(notification) => notification,
901 };
902 {
903 let service = shared_service.clone();
904 let mut extensions = Extensions::new();
905 let mut meta = Meta::new();
906 std::mem::swap(&mut extensions, notification.extensions_mut());
908 std::mem::swap(&mut meta, notification.get_meta_mut());
909 let context = NotificationContext {
910 peer: peer.clone(),
911 meta,
912 extensions,
913 };
914 let result = service.handle_notification(notification, context).await;
917 if let Err(error) = result {
918 tracing::warn!(%error, "Error sending notification");
919 }
920 }
921 }
922 Event::PeerMessage(JsonRpcMessage::Response(JsonRpcResponse {
923 result,
924 id,
925 ..
926 })) => {
927 if let Some(responder) = local_responder_pool.remove(&id) {
928 let response_result = responder.send(Ok(result));
929 if let Err(_error) = response_result {
930 tracing::warn!(%id, "Error sending response");
931 }
932 }
933 }
934 Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
935 if let Some(responder) = local_responder_pool.remove(&id) {
936 let _response_result = responder.send(Err(ServiceError::McpError(error)));
937 if let Err(_error) = _response_result {
938 tracing::warn!(%id, "Error sending response");
939 }
940 }
941 }
942 }
943 };
944
945 tokio::task::yield_now().await;
947
948 let sink_close_result = transport.close().await;
949 if let Err(e) = sink_close_result {
950 tracing::error!(%e, "fail to close sink");
951 }
952 tracing::info!(?quit_reason, "serve finished");
953 quit_reason
954 }.instrument(current_span));
955 RunningService {
956 service,
957 peer: peer_return,
958 handle: Some(handle),
959 cancellation_token: ct.clone(),
960 dg: ct.drop_guard(),
961 }
962}