1use futures::{FutureExt, future::BoxFuture};
2use thiserror::Error;
3
4#[cfg(feature = "server")]
5use crate::model::ServerJsonRpcMessage;
6use crate::{
7 error::ErrorData as McpError,
8 model::{
9 CancelledNotification, CancelledNotificationParam, Extensions, GetExtensions, GetMeta,
10 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Meta,
11 NumberOrString, ProgressToken, RequestId,
12 },
13 transport::{DynamicTransportError, IntoTransport, Transport},
14};
15#[cfg(feature = "client")]
16mod client;
17#[cfg(feature = "client")]
18pub use client::*;
19#[cfg(feature = "server")]
20mod server;
21#[cfg(feature = "server")]
22pub use server::*;
23#[cfg(feature = "tower")]
24mod tower;
25use tokio_util::sync::{CancellationToken, DropGuard};
26#[cfg(feature = "tower")]
27pub use tower::*;
28use tracing::{Instrument as _, instrument};
29#[derive(Error, Debug)]
30#[non_exhaustive]
31pub enum ServiceError {
32 #[error("Mcp error: {0}")]
33 McpError(McpError),
34 #[error("Transport send error: {0}")]
35 TransportSend(DynamicTransportError),
36 #[error("Transport closed")]
37 TransportClosed,
38 #[error("Unexpected response type")]
39 UnexpectedResponse,
40 #[error("task cancelled for reason {}", reason.as_deref().unwrap_or("<unknown>"))]
41 Cancelled { reason: Option<String> },
42 #[error("request timeout after {}", chrono::Duration::from_std(*timeout).unwrap_or_default())]
43 Timeout { timeout: Duration },
44}
45
46trait TransferObject:
47 std::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
48{
49}
50
51impl<T> TransferObject for T where
52 T: std::fmt::Debug
53 + serde::Serialize
54 + serde::de::DeserializeOwned
55 + Send
56 + Sync
57 + 'static
58 + Clone
59{
60}
61
62#[allow(private_bounds, reason = "there's no the third implementation")]
63pub trait ServiceRole: std::fmt::Debug + Send + Sync + 'static + Copy + Clone {
64 type Req: TransferObject + GetMeta + GetExtensions;
65 type Resp: TransferObject;
66 type Not: TryInto<CancelledNotification, Error = Self::Not>
67 + From<CancelledNotification>
68 + TransferObject;
69 type PeerReq: TransferObject + GetMeta + GetExtensions;
70 type PeerResp: TransferObject;
71 type PeerNot: TryInto<CancelledNotification, Error = Self::PeerNot>
72 + From<CancelledNotification>
73 + TransferObject
74 + GetMeta
75 + GetExtensions;
76 type InitializeError;
77 const IS_CLIENT: bool;
78 type Info: TransferObject;
79 type PeerInfo: TransferObject;
80}
81
82pub type TxJsonRpcMessage<R> =
83 JsonRpcMessage<<R as ServiceRole>::Req, <R as ServiceRole>::Resp, <R as ServiceRole>::Not>;
84pub type RxJsonRpcMessage<R> = JsonRpcMessage<
85 <R as ServiceRole>::PeerReq,
86 <R as ServiceRole>::PeerResp,
87 <R as ServiceRole>::PeerNot,
88>;
89
90pub trait Service<R: ServiceRole>: Send + Sync + 'static {
91 fn handle_request(
92 &self,
93 request: R::PeerReq,
94 context: RequestContext<R>,
95 ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_;
96 fn handle_notification(
97 &self,
98 notification: R::PeerNot,
99 context: NotificationContext<R>,
100 ) -> impl Future<Output = Result<(), McpError>> + Send + '_;
101 fn get_info(&self) -> R::Info;
102}
103
104pub trait ServiceExt<R: ServiceRole>: Service<R> + Sized {
105 fn into_dyn(self) -> Box<dyn DynService<R>> {
109 Box::new(self)
110 }
111 fn serve<T, E, A>(
112 self,
113 transport: T,
114 ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
115 where
116 T: IntoTransport<R, E, A>,
117 E: std::error::Error + Send + Sync + 'static,
118 Self: Sized,
119 {
120 Self::serve_with_ct(self, transport, Default::default())
121 }
122 fn serve_with_ct<T, E, A>(
123 self,
124 transport: T,
125 ct: CancellationToken,
126 ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
127 where
128 T: IntoTransport<R, E, A>,
129 E: std::error::Error + Send + Sync + 'static,
130 Self: Sized;
131}
132
133impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
134 fn handle_request(
135 &self,
136 request: R::PeerReq,
137 context: RequestContext<R>,
138 ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_ {
139 DynService::handle_request(self.as_ref(), request, context)
140 }
141
142 fn handle_notification(
143 &self,
144 notification: R::PeerNot,
145 context: NotificationContext<R>,
146 ) -> impl Future<Output = Result<(), McpError>> + Send + '_ {
147 DynService::handle_notification(self.as_ref(), notification, context)
148 }
149
150 fn get_info(&self) -> R::Info {
151 DynService::get_info(self.as_ref())
152 }
153}
154
155pub trait DynService<R: ServiceRole>: Send + Sync {
156 fn handle_request(
157 &self,
158 request: R::PeerReq,
159 context: RequestContext<R>,
160 ) -> BoxFuture<'_, Result<R::Resp, McpError>>;
161 fn handle_notification(
162 &self,
163 notification: R::PeerNot,
164 context: NotificationContext<R>,
165 ) -> BoxFuture<'_, Result<(), McpError>>;
166 fn get_info(&self) -> R::Info;
167}
168
169impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
170 fn handle_request(
171 &self,
172 request: R::PeerReq,
173 context: RequestContext<R>,
174 ) -> BoxFuture<'_, Result<R::Resp, McpError>> {
175 Box::pin(self.handle_request(request, context))
176 }
177 fn handle_notification(
178 &self,
179 notification: R::PeerNot,
180 context: NotificationContext<R>,
181 ) -> BoxFuture<'_, Result<(), McpError>> {
182 Box::pin(self.handle_notification(notification, context))
183 }
184 fn get_info(&self) -> R::Info {
185 self.get_info()
186 }
187}
188
189use std::{
190 collections::{HashMap, VecDeque},
191 ops::Deref,
192 sync::{Arc, atomic::AtomicU64},
193 time::Duration,
194};
195
196use tokio::sync::mpsc;
197
198pub trait RequestIdProvider: Send + Sync + 'static {
199 fn next_request_id(&self) -> RequestId;
200}
201
202pub trait ProgressTokenProvider: Send + Sync + 'static {
203 fn next_progress_token(&self) -> ProgressToken;
204}
205
206pub type AtomicU32RequestIdProvider = AtomicU32Provider;
207pub type AtomicU32ProgressTokenProvider = AtomicU32Provider;
208
209#[derive(Debug, Default)]
210pub struct AtomicU32Provider {
211 id: AtomicU64,
212}
213
214impl RequestIdProvider for AtomicU32Provider {
215 fn next_request_id(&self) -> RequestId {
216 let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
217 RequestId::Number(id as i64)
219 }
220}
221
222impl ProgressTokenProvider for AtomicU32Provider {
223 fn next_progress_token(&self) -> ProgressToken {
224 let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
225 ProgressToken(NumberOrString::Number(id as i64))
226 }
227}
228
229type Responder<T> = tokio::sync::oneshot::Sender<T>;
230
231#[derive(Debug)]
237pub struct RequestHandle<R: ServiceRole> {
238 pub rx: tokio::sync::oneshot::Receiver<Result<R::PeerResp, ServiceError>>,
239 pub options: PeerRequestOptions,
240 pub peer: Peer<R>,
241 pub id: RequestId,
242 pub progress_token: ProgressToken,
243}
244
245impl<R: ServiceRole> RequestHandle<R> {
246 pub const REQUEST_TIMEOUT_REASON: &str = "request timeout";
247 pub async fn await_response(self) -> Result<R::PeerResp, ServiceError> {
248 if let Some(timeout) = self.options.timeout {
249 let timeout_result = tokio::time::timeout(timeout, async move {
250 self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
251 })
252 .await;
253 match timeout_result {
254 Ok(response) => response,
255 Err(_) => {
256 let error = Err(ServiceError::Timeout { timeout });
257 let notification = CancelledNotification {
259 params: CancelledNotificationParam {
260 request_id: self.id,
261 reason: Some(Self::REQUEST_TIMEOUT_REASON.to_owned()),
262 },
263 method: crate::model::CancelledNotificationMethod,
264 extensions: Default::default(),
265 };
266 let _ = self.peer.send_notification(notification.into()).await;
267 error
268 }
269 }
270 } else {
271 self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
272 }
273 }
274
275 pub async fn cancel(self, reason: Option<String>) -> Result<(), ServiceError> {
277 let notification = CancelledNotification {
278 params: CancelledNotificationParam {
279 request_id: self.id,
280 reason,
281 },
282 method: crate::model::CancelledNotificationMethod,
283 extensions: Default::default(),
284 };
285 self.peer.send_notification(notification.into()).await?;
286 Ok(())
287 }
288}
289
290#[derive(Debug)]
291pub(crate) enum PeerSinkMessage<R: ServiceRole> {
292 Request {
293 request: R::Req,
294 id: RequestId,
295 responder: Responder<Result<R::PeerResp, ServiceError>>,
296 },
297 Notification {
298 notification: R::Not,
299 responder: Responder<Result<(), ServiceError>>,
300 },
301}
302
303#[derive(Clone)]
309pub struct Peer<R: ServiceRole> {
310 tx: mpsc::Sender<PeerSinkMessage<R>>,
311 request_id_provider: Arc<dyn RequestIdProvider>,
312 progress_token_provider: Arc<dyn ProgressTokenProvider>,
313 info: Arc<tokio::sync::OnceCell<R::PeerInfo>>,
314}
315
316impl<R: ServiceRole> std::fmt::Debug for Peer<R> {
317 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318 f.debug_struct("PeerSink")
319 .field("tx", &self.tx)
320 .field("is_client", &R::IS_CLIENT)
321 .finish()
322 }
323}
324
325type ProxyOutbound<R> = mpsc::Receiver<PeerSinkMessage<R>>;
326
327#[derive(Debug, Default)]
328pub struct PeerRequestOptions {
329 pub timeout: Option<Duration>,
330 pub meta: Option<Meta>,
331}
332
333impl PeerRequestOptions {
334 pub fn no_options() -> Self {
335 Self::default()
336 }
337}
338
339impl<R: ServiceRole> Peer<R> {
340 const CLIENT_CHANNEL_BUFFER_SIZE: usize = 1024;
341 pub(crate) fn new(
342 request_id_provider: Arc<dyn RequestIdProvider>,
343 peer_info: Option<R::PeerInfo>,
344 ) -> (Peer<R>, ProxyOutbound<R>) {
345 let (tx, rx) = mpsc::channel(Self::CLIENT_CHANNEL_BUFFER_SIZE);
346 (
347 Self {
348 tx,
349 request_id_provider,
350 progress_token_provider: Arc::new(AtomicU32ProgressTokenProvider::default()),
351 info: Arc::new(tokio::sync::OnceCell::new_with(peer_info)),
352 },
353 rx,
354 )
355 }
356 pub async fn send_notification(&self, notification: R::Not) -> Result<(), ServiceError> {
357 let (responder, receiver) = tokio::sync::oneshot::channel();
358 self.tx
359 .send(PeerSinkMessage::Notification {
360 notification,
361 responder,
362 })
363 .await
364 .map_err(|_m| ServiceError::TransportClosed)?;
365 receiver.await.map_err(|_e| ServiceError::TransportClosed)?
366 }
367 pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
368 self.send_request_with_option(request, PeerRequestOptions::no_options())
369 .await?
370 .await_response()
371 .await
372 }
373
374 pub async fn send_cancellable_request(
375 &self,
376 request: R::Req,
377 options: PeerRequestOptions,
378 ) -> Result<RequestHandle<R>, ServiceError> {
379 self.send_request_with_option(request, options).await
380 }
381
382 pub async fn send_request_with_option(
383 &self,
384 mut request: R::Req,
385 options: PeerRequestOptions,
386 ) -> Result<RequestHandle<R>, ServiceError> {
387 let id = self.request_id_provider.next_request_id();
388 let progress_token = self.progress_token_provider.next_progress_token();
389 request
390 .get_meta_mut()
391 .set_progress_token(progress_token.clone());
392 if let Some(meta) = options.meta.clone() {
393 request.get_meta_mut().extend(meta);
394 }
395 let (responder, receiver) = tokio::sync::oneshot::channel();
396 self.tx
397 .send(PeerSinkMessage::Request {
398 request,
399 id: id.clone(),
400 responder,
401 })
402 .await
403 .map_err(|_m| ServiceError::TransportClosed)?;
404 Ok(RequestHandle {
405 id,
406 rx: receiver,
407 progress_token,
408 options,
409 peer: self.clone(),
410 })
411 }
412 pub fn peer_info(&self) -> Option<&R::PeerInfo> {
413 self.info.get()
414 }
415
416 pub fn set_peer_info(&self, info: R::PeerInfo) {
417 if self.info.initialized() {
418 tracing::warn!("trying to set peer info, which is already initialized");
419 } else {
420 let _ = self.info.set(info);
421 }
422 }
423
424 pub fn is_transport_closed(&self) -> bool {
425 self.tx.is_closed()
426 }
427}
428
429#[derive(Debug)]
430pub struct RunningService<R: ServiceRole, S: Service<R>> {
431 service: Arc<S>,
432 peer: Peer<R>,
433 handle: Option<tokio::task::JoinHandle<QuitReason>>,
434 cancellation_token: CancellationToken,
435 dg: DropGuard,
436}
437impl<R: ServiceRole, S: Service<R>> Deref for RunningService<R, S> {
438 type Target = Peer<R>;
439
440 fn deref(&self) -> &Self::Target {
441 &self.peer
442 }
443}
444
445impl<R: ServiceRole, S: Service<R>> RunningService<R, S> {
446 #[inline]
447 pub fn peer(&self) -> &Peer<R> {
448 &self.peer
449 }
450 #[inline]
451 pub fn service(&self) -> &S {
452 self.service.as_ref()
453 }
454 #[inline]
455 pub fn cancellation_token(&self) -> RunningServiceCancellationToken {
456 RunningServiceCancellationToken(self.cancellation_token.clone())
457 }
458
459 #[inline]
461 pub fn is_closed(&self) -> bool {
462 self.handle.is_none() || self.cancellation_token.is_cancelled()
463 }
464
465 #[inline]
470 pub async fn waiting(mut self) -> Result<QuitReason, tokio::task::JoinError> {
471 match self.handle.take() {
472 Some(handle) => handle.await,
473 None => Ok(QuitReason::Closed),
474 }
475 }
476
477 pub async fn close(&mut self) -> Result<QuitReason, tokio::task::JoinError> {
495 if let Some(handle) = self.handle.take() {
496 self.cancellation_token.cancel();
499 handle.await
500 } else {
501 Ok(QuitReason::Closed)
503 }
504 }
505
506 pub async fn close_with_timeout(
515 &mut self,
516 timeout: Duration,
517 ) -> Result<Option<QuitReason>, tokio::task::JoinError> {
518 if let Some(handle) = self.handle.take() {
519 self.cancellation_token.cancel();
520 match tokio::time::timeout(timeout, handle).await {
521 Ok(result) => result.map(Some),
522 Err(_elapsed) => {
523 tracing::warn!(
524 "close_with_timeout: cleanup did not complete within {:?}",
525 timeout
526 );
527 Ok(None)
528 }
529 }
530 } else {
531 Ok(Some(QuitReason::Closed))
532 }
533 }
534
535 pub async fn cancel(mut self) -> Result<QuitReason, tokio::task::JoinError> {
540 let _ = std::mem::replace(&mut self.dg, self.cancellation_token.clone().drop_guard());
542 self.close().await
543 }
544}
545
546impl<R: ServiceRole, S: Service<R>> Drop for RunningService<R, S> {
547 fn drop(&mut self) {
548 if self.handle.is_some() && !self.cancellation_token.is_cancelled() {
549 tracing::debug!(
550 "RunningService dropped without explicit close(). \
551 The connection will be closed asynchronously. \
552 For guaranteed cleanup, call close() or cancel() before dropping."
553 );
554 }
555 }
557}
558
559pub struct RunningServiceCancellationToken(CancellationToken);
561
562impl RunningServiceCancellationToken {
563 pub fn cancel(self) {
564 self.0.cancel();
565 }
566}
567
568#[derive(Debug)]
569#[non_exhaustive]
570pub enum QuitReason {
571 Cancelled,
572 Closed,
573 JoinError(tokio::task::JoinError),
574}
575
576#[derive(Debug, Clone)]
578pub struct RequestContext<R: ServiceRole> {
579 pub ct: CancellationToken,
581 pub id: RequestId,
582 pub meta: Meta,
583 pub extensions: Extensions,
584 pub peer: Peer<R>,
586}
587
588impl<R: ServiceRole> RequestContext<R> {
589 pub fn new(id: RequestId, peer: Peer<R>) -> Self {
591 Self {
592 ct: CancellationToken::new(),
593 id,
594 meta: Meta::default(),
595 extensions: Extensions::default(),
596 peer,
597 }
598 }
599}
600
601#[derive(Debug, Clone)]
603pub struct NotificationContext<R: ServiceRole> {
604 pub meta: Meta,
605 pub extensions: Extensions,
606 pub peer: Peer<R>,
608}
609
610pub fn serve_directly<R, S, T, E, A>(
612 service: S,
613 transport: T,
614 peer_info: Option<R::PeerInfo>,
615) -> RunningService<R, S>
616where
617 R: ServiceRole,
618 S: Service<R>,
619 T: IntoTransport<R, E, A>,
620 E: std::error::Error + Send + Sync + 'static,
621{
622 serve_directly_with_ct(service, transport, peer_info, Default::default())
623}
624
625pub fn serve_directly_with_ct<R, S, T, E, A>(
627 service: S,
628 transport: T,
629 peer_info: Option<R::PeerInfo>,
630 ct: CancellationToken,
631) -> RunningService<R, S>
632where
633 R: ServiceRole,
634 S: Service<R>,
635 T: IntoTransport<R, E, A>,
636 E: std::error::Error + Send + Sync + 'static,
637{
638 let (peer, peer_rx) = Peer::new(Arc::new(AtomicU32RequestIdProvider::default()), peer_info);
639 serve_inner(service, transport.into_transport(), peer, peer_rx, ct)
640}
641
642#[instrument(skip_all)]
643fn serve_inner<R, S, T>(
644 service: S,
645 transport: T,
646 peer: Peer<R>,
647 mut peer_rx: tokio::sync::mpsc::Receiver<PeerSinkMessage<R>>,
648 ct: CancellationToken,
649) -> RunningService<R, S>
650where
651 R: ServiceRole,
652 S: Service<R>,
653 T: Transport<R> + 'static,
654{
655 const SINK_PROXY_BUFFER_SIZE: usize = 64;
656 let (sink_proxy_tx, mut sink_proxy_rx) =
657 tokio::sync::mpsc::channel::<TxJsonRpcMessage<R>>(SINK_PROXY_BUFFER_SIZE);
658 let peer_info = peer.peer_info();
659 if R::IS_CLIENT {
660 tracing::info!(?peer_info, "Service initialized as client");
661 } else {
662 tracing::info!(?peer_info, "Service initialized as server");
663 }
664
665 let mut local_responder_pool =
666 HashMap::<RequestId, Responder<Result<R::PeerResp, ServiceError>>>::new();
667 let mut local_ct_pool = HashMap::<RequestId, CancellationToken>::new();
668 let shared_service = Arc::new(service);
669 let service = shared_service.clone();
671
672 let serve_loop_ct = ct.child_token();
675 let peer_return: Peer<R> = peer.clone();
676 let current_span = tracing::Span::current();
677 let handle = tokio::spawn(async move {
678 let mut transport = transport.into_transport();
679 let mut batch_messages = VecDeque::<RxJsonRpcMessage<R>>::new();
680 let mut send_task_set = tokio::task::JoinSet::<SendTaskResult>::new();
681 #[derive(Debug)]
682 enum SendTaskResult {
683 Request {
684 id: RequestId,
685 result: Result<(), DynamicTransportError>,
686 },
687 Notification {
688 responder: Responder<Result<(), ServiceError>>,
689 cancellation_param: Option<CancelledNotificationParam>,
690 result: Result<(), DynamicTransportError>,
691 },
692 }
693 #[derive(Debug)]
694 enum Event<R: ServiceRole> {
695 ProxyMessage(PeerSinkMessage<R>),
696 PeerMessage(RxJsonRpcMessage<R>),
697 ToSink(TxJsonRpcMessage<R>),
698 SendTaskResult(SendTaskResult),
699 }
700
701 let quit_reason = loop {
702 let evt = if let Some(m) = batch_messages.pop_front() {
703 Event::PeerMessage(m)
704 } else {
705 tokio::select! {
706 m = sink_proxy_rx.recv(), if !sink_proxy_rx.is_closed() => {
707 if let Some(m) = m {
708 Event::ToSink(m)
709 } else {
710 continue
711 }
712 }
713 m = transport.receive() => {
714 if let Some(m) = m {
715 Event::PeerMessage(m)
716 } else {
717 tracing::info!("input stream terminated");
719 break QuitReason::Closed
720 }
721 }
722 m = peer_rx.recv(), if !peer_rx.is_closed() => {
723 if let Some(m) = m {
724 Event::ProxyMessage(m)
725 } else {
726 continue
727 }
728 }
729 m = send_task_set.join_next(), if !send_task_set.is_empty() => {
730 let Some(result) = m else {
731 continue
732 };
733 match result {
734 Err(e) => {
735 tracing::error!(%e, "send request task encounter a tokio join error");
737 break QuitReason::JoinError(e)
738 }
739 Ok(result) => {
740 Event::SendTaskResult(result)
741 }
742 }
743 }
744 _ = serve_loop_ct.cancelled() => {
745 tracing::info!("task cancelled");
746 break QuitReason::Cancelled
747 }
748 }
749 };
750
751 tracing::trace!(?evt, "new event");
752 match evt {
753 Event::SendTaskResult(SendTaskResult::Request { id, result }) => {
754 if let Err(e) = result {
755 if let Some(responder) = local_responder_pool.remove(&id) {
756 let _ = responder.send(Err(ServiceError::TransportSend(e)));
757 }
758 }
759 }
760 Event::SendTaskResult(SendTaskResult::Notification {
761 responder,
762 result,
763 cancellation_param,
764 }) => {
765 let response = if let Err(e) = result {
766 Err(ServiceError::TransportSend(e))
767 } else {
768 Ok(())
769 };
770 let _ = responder.send(response);
771 if let Some(param) = cancellation_param {
772 if let Some(responder) = local_responder_pool.remove(¶m.request_id) {
773 tracing::info!(id = %param.request_id, reason = param.reason, "cancelled");
774 let _response_result = responder.send(Err(ServiceError::Cancelled {
775 reason: param.reason.clone(),
776 }));
777 }
778 }
779 }
780 Event::ToSink(m) => {
782 if let Some(id) = match &m {
783 JsonRpcMessage::Response(response) => Some(&response.id),
784 JsonRpcMessage::Error(error) => Some(&error.id),
785 _ => None,
786 } {
787 if let Some(ct) = local_ct_pool.remove(id) {
788 ct.cancel();
789 }
790 let send = transport.send(m);
791 let current_span = tracing::Span::current();
792 tokio::spawn(async move {
793 let send_result = send.await;
794 if let Err(error) = send_result {
795 tracing::error!(%error, "fail to response message");
796 }
797 }.instrument(current_span));
798 }
799 }
800 Event::ProxyMessage(PeerSinkMessage::Request {
801 request,
802 id,
803 responder,
804 }) => {
805 local_responder_pool.insert(id.clone(), responder);
806 let send = transport.send(JsonRpcMessage::request(request, id.clone()));
807 {
808 let id = id.clone();
809 let current_span = tracing::Span::current();
810 send_task_set.spawn(send.map(move |r| SendTaskResult::Request {
811 id,
812 result: r.map_err(DynamicTransportError::new::<T, R>),
813 }).instrument(current_span));
814 }
815 }
816 Event::ProxyMessage(PeerSinkMessage::Notification {
817 notification,
818 responder,
819 }) => {
820 let mut cancellation_param = None;
822 let notification = match notification.try_into() {
823 Ok::<CancelledNotification, _>(cancelled) => {
824 cancellation_param.replace(cancelled.params.clone());
825 cancelled.into()
826 }
827 Err(notification) => notification,
828 };
829 let send = transport.send(JsonRpcMessage::notification(notification));
830 let current_span = tracing::Span::current();
831 send_task_set.spawn(send.map(move |result| SendTaskResult::Notification {
832 responder,
833 cancellation_param,
834 result: result.map_err(DynamicTransportError::new::<T, R>),
835 }).instrument(current_span));
836 }
837 Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
838 id,
839 mut request,
840 ..
841 })) => {
842 tracing::debug!(%id, ?request, "received request");
843 {
844 let service = shared_service.clone();
845 let sink = sink_proxy_tx.clone();
846 let request_ct = serve_loop_ct.child_token();
847 let context_ct = request_ct.child_token();
848 local_ct_pool.insert(id.clone(), request_ct);
849 let mut extensions = Extensions::new();
850 let mut meta = Meta::new();
851 std::mem::swap(&mut meta, request.get_meta_mut());
854 std::mem::swap(&mut extensions, request.extensions_mut());
855 let context = RequestContext {
856 ct: context_ct,
857 id: id.clone(),
858 peer: peer.clone(),
859 meta,
860 extensions,
861 };
862 let current_span = tracing::Span::current();
863 tokio::spawn(async move {
864 let result = service
865 .handle_request(request, context)
866 .await;
867 let response = match result {
868 Ok(result) => {
869 tracing::debug!(%id, ?result, "response message");
870 JsonRpcMessage::response(result, id)
871 }
872 Err(error) => {
873 tracing::warn!(%id, ?error, "response error");
874 JsonRpcMessage::error(error, id)
875 }
876 };
877 let _send_result = sink.send(response).await;
878 }.instrument(current_span));
879 }
880 }
881 Event::PeerMessage(JsonRpcMessage::Notification(JsonRpcNotification {
882 notification,
883 ..
884 })) => {
885 tracing::info!(?notification, "received notification");
886 let mut notification = match notification.try_into() {
888 Ok::<CancelledNotification, _>(cancelled) => {
889 if let Some(ct) = local_ct_pool.remove(&cancelled.params.request_id) {
890 tracing::info!(id = %cancelled.params.request_id, reason = cancelled.params.reason, "cancelled");
891 ct.cancel();
892 }
893 cancelled.into()
894 }
895 Err(notification) => notification,
896 };
897 {
898 let service = shared_service.clone();
899 let mut extensions = Extensions::new();
900 let mut meta = Meta::new();
901 std::mem::swap(&mut extensions, notification.extensions_mut());
903 std::mem::swap(&mut meta, notification.get_meta_mut());
904 let context = NotificationContext {
905 peer: peer.clone(),
906 meta,
907 extensions,
908 };
909 let current_span = tracing::Span::current();
910 tokio::spawn(async move {
911 let result = service.handle_notification(notification, context).await;
912 if let Err(error) = result {
913 tracing::warn!(%error, "Error sending notification");
914 }
915 }.instrument(current_span));
916 }
917 }
918 Event::PeerMessage(JsonRpcMessage::Response(JsonRpcResponse {
919 result,
920 id,
921 ..
922 })) => {
923 if let Some(responder) = local_responder_pool.remove(&id) {
924 let response_result = responder.send(Ok(result));
925 if let Err(_error) = response_result {
926 tracing::warn!(%id, "Error sending response");
927 }
928 }
929 }
930 Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
931 if let Some(responder) = local_responder_pool.remove(&id) {
932 let _response_result = responder.send(Err(ServiceError::McpError(error)));
933 if let Err(_error) = _response_result {
934 tracing::warn!(%id, "Error sending response");
935 }
936 }
937 }
938 }
939 };
940 let sink_close_result = transport.close().await;
941 if let Err(e) = sink_close_result {
942 tracing::error!(%e, "fail to close sink");
943 }
944 tracing::info!(?quit_reason, "serve finished");
945 quit_reason
946 }.instrument(current_span));
947 RunningService {
948 service,
949 peer: peer_return,
950 handle: Some(handle),
951 cancellation_token: ct.clone(),
952 dg: ct.drop_guard(),
953 }
954}