1use futures::{FutureExt, future::BoxFuture};
2use thiserror::Error;
3
4use crate::{
5 error::ErrorData as McpError,
6 model::{
7 CancelledNotification, CancelledNotificationParam, Extensions, GetExtensions, GetMeta,
8 JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Meta,
9 NumberOrString, ProgressToken, RequestId, ServerJsonRpcMessage,
10 },
11 transport::{DynamicTransportError, IntoTransport, Transport},
12};
13#[cfg(feature = "client")]
14#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
15mod client;
16#[cfg(feature = "client")]
17#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
18pub use client::*;
19#[cfg(feature = "server")]
20#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
21mod server;
22#[cfg(feature = "server")]
23#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
24pub use server::*;
25#[cfg(feature = "tower")]
26#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
27mod tower;
28use tokio_util::sync::{CancellationToken, DropGuard};
29#[cfg(feature = "tower")]
30#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
31pub use tower::*;
32use tracing::{Instrument as _, instrument};
33#[derive(Error, Debug)]
34#[non_exhaustive]
35pub enum ServiceError {
36 #[error("Mcp error: {0}")]
37 McpError(McpError),
38 #[error("Transport send error: {0}")]
39 TransportSend(DynamicTransportError),
40 #[error("Transport closed")]
41 TransportClosed,
42 #[error("Unexpected response type")]
43 UnexpectedResponse,
44 #[error("task cancelled for reason {}", reason.as_deref().unwrap_or("<unknown>"))]
45 Cancelled { reason: Option<String> },
46 #[error("request timeout after {}", chrono::Duration::from_std(*timeout).unwrap_or_default())]
47 Timeout { timeout: Duration },
48}
49
50trait TransferObject:
51 std::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
52{
53}
54
55impl<T> TransferObject for T where
56 T: std::fmt::Debug
57 + serde::Serialize
58 + serde::de::DeserializeOwned
59 + Send
60 + Sync
61 + 'static
62 + Clone
63{
64}
65
66#[allow(private_bounds, reason = "there's no the third implementation")]
67pub trait ServiceRole: std::fmt::Debug + Send + Sync + 'static + Copy + Clone {
68 type Req: TransferObject + GetMeta + GetExtensions;
69 type Resp: TransferObject;
70 type Not: TryInto<CancelledNotification, Error = Self::Not>
71 + From<CancelledNotification>
72 + TransferObject;
73 type PeerReq: TransferObject + GetMeta + GetExtensions;
74 type PeerResp: TransferObject;
75 type PeerNot: TryInto<CancelledNotification, Error = Self::PeerNot>
76 + From<CancelledNotification>
77 + TransferObject
78 + GetMeta
79 + GetExtensions;
80 type InitializeError;
81 const IS_CLIENT: bool;
82 type Info: TransferObject;
83 type PeerInfo: TransferObject;
84}
85
86pub type TxJsonRpcMessage<R> =
87 JsonRpcMessage<<R as ServiceRole>::Req, <R as ServiceRole>::Resp, <R as ServiceRole>::Not>;
88pub type RxJsonRpcMessage<R> = JsonRpcMessage<
89 <R as ServiceRole>::PeerReq,
90 <R as ServiceRole>::PeerResp,
91 <R as ServiceRole>::PeerNot,
92>;
93
94pub trait Service<R: ServiceRole>: Send + Sync + 'static {
95 fn handle_request(
96 &self,
97 request: R::PeerReq,
98 context: RequestContext<R>,
99 ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_;
100 fn handle_notification(
101 &self,
102 notification: R::PeerNot,
103 context: NotificationContext<R>,
104 ) -> impl Future<Output = Result<(), McpError>> + Send + '_;
105 fn get_info(&self) -> R::Info;
106}
107
108pub trait ServiceExt<R: ServiceRole>: Service<R> + Sized {
109 fn into_dyn(self) -> Box<dyn DynService<R>> {
113 Box::new(self)
114 }
115 fn serve<T, E, A>(
116 self,
117 transport: T,
118 ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
119 where
120 T: IntoTransport<R, E, A>,
121 E: std::error::Error + Send + Sync + 'static,
122 Self: Sized,
123 {
124 Self::serve_with_ct(self, transport, Default::default())
125 }
126 fn serve_with_ct<T, E, A>(
127 self,
128 transport: T,
129 ct: CancellationToken,
130 ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
131 where
132 T: IntoTransport<R, E, A>,
133 E: std::error::Error + Send + Sync + 'static,
134 Self: Sized;
135}
136
137impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
138 fn handle_request(
139 &self,
140 request: R::PeerReq,
141 context: RequestContext<R>,
142 ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_ {
143 DynService::handle_request(self.as_ref(), request, context)
144 }
145
146 fn handle_notification(
147 &self,
148 notification: R::PeerNot,
149 context: NotificationContext<R>,
150 ) -> impl Future<Output = Result<(), McpError>> + Send + '_ {
151 DynService::handle_notification(self.as_ref(), notification, context)
152 }
153
154 fn get_info(&self) -> R::Info {
155 DynService::get_info(self.as_ref())
156 }
157}
158
159pub trait DynService<R: ServiceRole>: Send + Sync {
160 fn handle_request(
161 &self,
162 request: R::PeerReq,
163 context: RequestContext<R>,
164 ) -> BoxFuture<'_, Result<R::Resp, McpError>>;
165 fn handle_notification(
166 &self,
167 notification: R::PeerNot,
168 context: NotificationContext<R>,
169 ) -> BoxFuture<'_, Result<(), McpError>>;
170 fn get_info(&self) -> R::Info;
171}
172
173impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
174 fn handle_request(
175 &self,
176 request: R::PeerReq,
177 context: RequestContext<R>,
178 ) -> BoxFuture<'_, Result<R::Resp, McpError>> {
179 Box::pin(self.handle_request(request, context))
180 }
181 fn handle_notification(
182 &self,
183 notification: R::PeerNot,
184 context: NotificationContext<R>,
185 ) -> BoxFuture<'_, Result<(), McpError>> {
186 Box::pin(self.handle_notification(notification, context))
187 }
188 fn get_info(&self) -> R::Info {
189 self.get_info()
190 }
191}
192
193use std::{
194 collections::{HashMap, VecDeque},
195 ops::Deref,
196 sync::{Arc, atomic::AtomicU64},
197 time::Duration,
198};
199
200use tokio::sync::mpsc;
201
202pub trait RequestIdProvider: Send + Sync + 'static {
203 fn next_request_id(&self) -> RequestId;
204}
205
206pub trait ProgressTokenProvider: Send + Sync + 'static {
207 fn next_progress_token(&self) -> ProgressToken;
208}
209
210pub type AtomicU32RequestIdProvider = AtomicU32Provider;
211pub type AtomicU32ProgressTokenProvider = AtomicU32Provider;
212
213#[derive(Debug, Default)]
214pub struct AtomicU32Provider {
215 id: AtomicU64,
216}
217
218impl RequestIdProvider for AtomicU32Provider {
219 fn next_request_id(&self) -> RequestId {
220 let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
221 RequestId::Number(id as i64)
223 }
224}
225
226impl ProgressTokenProvider for AtomicU32Provider {
227 fn next_progress_token(&self) -> ProgressToken {
228 let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
229 ProgressToken(NumberOrString::Number(id as i64))
230 }
231}
232
233type Responder<T> = tokio::sync::oneshot::Sender<T>;
234
235#[derive(Debug)]
241pub struct RequestHandle<R: ServiceRole> {
242 pub rx: tokio::sync::oneshot::Receiver<Result<R::PeerResp, ServiceError>>,
243 pub options: PeerRequestOptions,
244 pub peer: Peer<R>,
245 pub id: RequestId,
246 pub progress_token: ProgressToken,
247}
248
249impl<R: ServiceRole> RequestHandle<R> {
250 pub const REQUEST_TIMEOUT_REASON: &str = "request timeout";
251 pub async fn await_response(self) -> Result<R::PeerResp, ServiceError> {
252 if let Some(timeout) = self.options.timeout {
253 let timeout_result = tokio::time::timeout(timeout, async move {
254 self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
255 })
256 .await;
257 match timeout_result {
258 Ok(response) => response,
259 Err(_) => {
260 let error = Err(ServiceError::Timeout { timeout });
261 let notification = CancelledNotification {
263 params: CancelledNotificationParam {
264 request_id: self.id,
265 reason: Some(Self::REQUEST_TIMEOUT_REASON.to_owned()),
266 },
267 method: crate::model::CancelledNotificationMethod,
268 extensions: Default::default(),
269 };
270 let _ = self.peer.send_notification(notification.into()).await;
271 error
272 }
273 }
274 } else {
275 self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
276 }
277 }
278
279 pub async fn cancel(self, reason: Option<String>) -> Result<(), ServiceError> {
281 let notification = CancelledNotification {
282 params: CancelledNotificationParam {
283 request_id: self.id,
284 reason,
285 },
286 method: crate::model::CancelledNotificationMethod,
287 extensions: Default::default(),
288 };
289 self.peer.send_notification(notification.into()).await?;
290 Ok(())
291 }
292}
293
294#[derive(Debug)]
295pub(crate) enum PeerSinkMessage<R: ServiceRole> {
296 Request {
297 request: R::Req,
298 id: RequestId,
299 responder: Responder<Result<R::PeerResp, ServiceError>>,
300 },
301 Notification {
302 notification: R::Not,
303 responder: Responder<Result<(), ServiceError>>,
304 },
305}
306
307#[derive(Clone)]
313pub struct Peer<R: ServiceRole> {
314 tx: mpsc::Sender<PeerSinkMessage<R>>,
315 request_id_provider: Arc<dyn RequestIdProvider>,
316 progress_token_provider: Arc<dyn ProgressTokenProvider>,
317 info: Arc<tokio::sync::OnceCell<R::PeerInfo>>,
318}
319
320impl<R: ServiceRole> std::fmt::Debug for Peer<R> {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 f.debug_struct("PeerSink")
323 .field("tx", &self.tx)
324 .field("is_client", &R::IS_CLIENT)
325 .finish()
326 }
327}
328
329type ProxyOutbound<R> = mpsc::Receiver<PeerSinkMessage<R>>;
330
331#[derive(Debug, Default)]
332pub struct PeerRequestOptions {
333 pub timeout: Option<Duration>,
334 pub meta: Option<Meta>,
335}
336
337impl PeerRequestOptions {
338 pub fn no_options() -> Self {
339 Self::default()
340 }
341}
342
343impl<R: ServiceRole> Peer<R> {
344 const CLIENT_CHANNEL_BUFFER_SIZE: usize = 1024;
345 pub(crate) fn new(
346 request_id_provider: Arc<dyn RequestIdProvider>,
347 peer_info: Option<R::PeerInfo>,
348 ) -> (Peer<R>, ProxyOutbound<R>) {
349 let (tx, rx) = mpsc::channel(Self::CLIENT_CHANNEL_BUFFER_SIZE);
350 (
351 Self {
352 tx,
353 request_id_provider,
354 progress_token_provider: Arc::new(AtomicU32ProgressTokenProvider::default()),
355 info: Arc::new(tokio::sync::OnceCell::new_with(peer_info)),
356 },
357 rx,
358 )
359 }
360 pub async fn send_notification(&self, notification: R::Not) -> Result<(), ServiceError> {
361 let (responder, receiver) = tokio::sync::oneshot::channel();
362 self.tx
363 .send(PeerSinkMessage::Notification {
364 notification,
365 responder,
366 })
367 .await
368 .map_err(|_m| ServiceError::TransportClosed)?;
369 receiver.await.map_err(|_e| ServiceError::TransportClosed)?
370 }
371 pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
372 self.send_request_with_option(request, PeerRequestOptions::no_options())
373 .await?
374 .await_response()
375 .await
376 }
377
378 pub async fn send_cancellable_request(
379 &self,
380 request: R::Req,
381 options: PeerRequestOptions,
382 ) -> Result<RequestHandle<R>, ServiceError> {
383 self.send_request_with_option(request, options).await
384 }
385
386 pub async fn send_request_with_option(
387 &self,
388 mut request: R::Req,
389 options: PeerRequestOptions,
390 ) -> Result<RequestHandle<R>, ServiceError> {
391 let id = self.request_id_provider.next_request_id();
392 let progress_token = self.progress_token_provider.next_progress_token();
393 request
394 .get_meta_mut()
395 .set_progress_token(progress_token.clone());
396 if let Some(meta) = options.meta.clone() {
397 request.get_meta_mut().extend(meta);
398 }
399 let (responder, receiver) = tokio::sync::oneshot::channel();
400 self.tx
401 .send(PeerSinkMessage::Request {
402 request,
403 id: id.clone(),
404 responder,
405 })
406 .await
407 .map_err(|_m| ServiceError::TransportClosed)?;
408 Ok(RequestHandle {
409 id,
410 rx: receiver,
411 progress_token,
412 options,
413 peer: self.clone(),
414 })
415 }
416 pub fn peer_info(&self) -> Option<&R::PeerInfo> {
417 self.info.get()
418 }
419
420 pub fn set_peer_info(&self, info: R::PeerInfo) {
421 if self.info.initialized() {
422 tracing::warn!("trying to set peer info, which is already initialized");
423 } else {
424 let _ = self.info.set(info);
425 }
426 }
427
428 pub fn is_transport_closed(&self) -> bool {
429 self.tx.is_closed()
430 }
431}
432
433#[derive(Debug)]
434pub struct RunningService<R: ServiceRole, S: Service<R>> {
435 service: Arc<S>,
436 peer: Peer<R>,
437 handle: tokio::task::JoinHandle<QuitReason>,
438 cancellation_token: CancellationToken,
439 dg: DropGuard,
440}
441impl<R: ServiceRole, S: Service<R>> Deref for RunningService<R, S> {
442 type Target = Peer<R>;
443
444 fn deref(&self) -> &Self::Target {
445 &self.peer
446 }
447}
448
449impl<R: ServiceRole, S: Service<R>> RunningService<R, S> {
450 #[inline]
451 pub fn peer(&self) -> &Peer<R> {
452 &self.peer
453 }
454 #[inline]
455 pub fn service(&self) -> &S {
456 self.service.as_ref()
457 }
458 #[inline]
459 pub fn cancellation_token(&self) -> RunningServiceCancellationToken {
460 RunningServiceCancellationToken(self.cancellation_token.clone())
461 }
462 #[inline]
463 pub async fn waiting(self) -> Result<QuitReason, tokio::task::JoinError> {
464 self.handle.await
465 }
466 pub async fn cancel(self) -> Result<QuitReason, tokio::task::JoinError> {
467 let RunningService { dg, handle, .. } = self;
468 dg.disarm().cancel();
469 handle.await
470 }
471}
472
473pub struct RunningServiceCancellationToken(CancellationToken);
475
476impl RunningServiceCancellationToken {
477 pub fn cancel(self) {
478 self.0.cancel();
479 }
480}
481
482#[derive(Debug)]
483pub enum QuitReason {
484 Cancelled,
485 Closed,
486 JoinError(tokio::task::JoinError),
487}
488
489#[derive(Debug, Clone)]
491pub struct RequestContext<R: ServiceRole> {
492 pub ct: CancellationToken,
494 pub id: RequestId,
495 pub meta: Meta,
496 pub extensions: Extensions,
497 pub peer: Peer<R>,
499}
500
501#[derive(Debug, Clone)]
503pub struct NotificationContext<R: ServiceRole> {
504 pub meta: Meta,
505 pub extensions: Extensions,
506 pub peer: Peer<R>,
508}
509
510pub fn serve_directly<R, S, T, E, A>(
512 service: S,
513 transport: T,
514 peer_info: Option<R::PeerInfo>,
515) -> RunningService<R, S>
516where
517 R: ServiceRole,
518 S: Service<R>,
519 T: IntoTransport<R, E, A>,
520 E: std::error::Error + Send + Sync + 'static,
521{
522 serve_directly_with_ct(service, transport, peer_info, Default::default())
523}
524
525pub fn serve_directly_with_ct<R, S, T, E, A>(
527 service: S,
528 transport: T,
529 peer_info: Option<R::PeerInfo>,
530 ct: CancellationToken,
531) -> RunningService<R, S>
532where
533 R: ServiceRole,
534 S: Service<R>,
535 T: IntoTransport<R, E, A>,
536 E: std::error::Error + Send + Sync + 'static,
537{
538 let (peer, peer_rx) = Peer::new(Arc::new(AtomicU32RequestIdProvider::default()), peer_info);
539 serve_inner(service, transport.into_transport(), peer, peer_rx, ct)
540}
541
542#[instrument(skip_all)]
543fn serve_inner<R, S, T>(
544 service: S,
545 transport: T,
546 peer: Peer<R>,
547 mut peer_rx: tokio::sync::mpsc::Receiver<PeerSinkMessage<R>>,
548 ct: CancellationToken,
549) -> RunningService<R, S>
550where
551 R: ServiceRole,
552 S: Service<R>,
553 T: Transport<R> + 'static,
554{
555 const SINK_PROXY_BUFFER_SIZE: usize = 64;
556 let (sink_proxy_tx, mut sink_proxy_rx) =
557 tokio::sync::mpsc::channel::<TxJsonRpcMessage<R>>(SINK_PROXY_BUFFER_SIZE);
558 let peer_info = peer.peer_info();
559 if R::IS_CLIENT {
560 tracing::info!(?peer_info, "Service initialized as client");
561 } else {
562 tracing::info!(?peer_info, "Service initialized as server");
563 }
564
565 let mut local_responder_pool =
566 HashMap::<RequestId, Responder<Result<R::PeerResp, ServiceError>>>::new();
567 let mut local_ct_pool = HashMap::<RequestId, CancellationToken>::new();
568 let shared_service = Arc::new(service);
569 let service = shared_service.clone();
571
572 let serve_loop_ct = ct.child_token();
575 let peer_return: Peer<R> = peer.clone();
576 let current_span = tracing::Span::current();
577 let handle = tokio::spawn(async move {
578 let mut transport = transport.into_transport();
579 let mut batch_messages = VecDeque::<RxJsonRpcMessage<R>>::new();
580 let mut send_task_set = tokio::task::JoinSet::<SendTaskResult>::new();
581 #[derive(Debug)]
582 enum SendTaskResult {
583 Request {
584 id: RequestId,
585 result: Result<(), DynamicTransportError>,
586 },
587 Notification {
588 responder: Responder<Result<(), ServiceError>>,
589 cancellation_param: Option<CancelledNotificationParam>,
590 result: Result<(), DynamicTransportError>,
591 },
592 }
593 #[derive(Debug)]
594 enum Event<R: ServiceRole> {
595 ProxyMessage(PeerSinkMessage<R>),
596 PeerMessage(RxJsonRpcMessage<R>),
597 ToSink(TxJsonRpcMessage<R>),
598 SendTaskResult(SendTaskResult),
599 }
600
601 let quit_reason = loop {
602 let evt = if let Some(m) = batch_messages.pop_front() {
603 Event::PeerMessage(m)
604 } else {
605 tokio::select! {
606 m = sink_proxy_rx.recv(), if !sink_proxy_rx.is_closed() => {
607 if let Some(m) = m {
608 Event::ToSink(m)
609 } else {
610 continue
611 }
612 }
613 m = transport.receive() => {
614 if let Some(m) = m {
615 Event::PeerMessage(m)
616 } else {
617 tracing::info!("input stream terminated");
619 break QuitReason::Closed
620 }
621 }
622 m = peer_rx.recv(), if !peer_rx.is_closed() => {
623 if let Some(m) = m {
624 Event::ProxyMessage(m)
625 } else {
626 continue
627 }
628 }
629 m = send_task_set.join_next(), if !send_task_set.is_empty() => {
630 let Some(result) = m else {
631 continue
632 };
633 match result {
634 Err(e) => {
635 tracing::error!(%e, "send request task encounter a tokio join error");
637 break QuitReason::JoinError(e)
638 }
639 Ok(result) => {
640 Event::SendTaskResult(result)
641 }
642 }
643 }
644 _ = serve_loop_ct.cancelled() => {
645 tracing::info!("task cancelled");
646 break QuitReason::Cancelled
647 }
648 }
649 };
650
651 tracing::trace!(?evt, "new event");
652 match evt {
653 Event::SendTaskResult(SendTaskResult::Request { id, result }) => {
654 if let Err(e) = result {
655 if let Some(responder) = local_responder_pool.remove(&id) {
656 let _ = responder.send(Err(ServiceError::TransportSend(e)));
657 }
658 }
659 }
660 Event::SendTaskResult(SendTaskResult::Notification {
661 responder,
662 result,
663 cancellation_param,
664 }) => {
665 let response = if let Err(e) = result {
666 Err(ServiceError::TransportSend(e))
667 } else {
668 Ok(())
669 };
670 let _ = responder.send(response);
671 if let Some(param) = cancellation_param {
672 if let Some(responder) = local_responder_pool.remove(¶m.request_id) {
673 tracing::info!(id = %param.request_id, reason = param.reason, "cancelled");
674 let _response_result = responder.send(Err(ServiceError::Cancelled {
675 reason: param.reason.clone(),
676 }));
677 }
678 }
679 }
680 Event::ToSink(m) => {
682 if let Some(id) = match &m {
683 JsonRpcMessage::Response(response) => Some(&response.id),
684 JsonRpcMessage::Error(error) => Some(&error.id),
685 _ => None,
686 } {
687 if let Some(ct) = local_ct_pool.remove(id) {
688 ct.cancel();
689 }
690 let send = transport.send(m);
691 let current_span = tracing::Span::current();
692 tokio::spawn(async move {
693 let send_result = send.await;
694 if let Err(error) = send_result {
695 tracing::error!(%error, "fail to response message");
696 }
697 }.instrument(current_span));
698 }
699 }
700 Event::ProxyMessage(PeerSinkMessage::Request {
701 request,
702 id,
703 responder,
704 }) => {
705 local_responder_pool.insert(id.clone(), responder);
706 let send = transport.send(JsonRpcMessage::request(request, id.clone()));
707 {
708 let id = id.clone();
709 let current_span = tracing::Span::current();
710 send_task_set.spawn(send.map(move |r| SendTaskResult::Request {
711 id,
712 result: r.map_err(DynamicTransportError::new::<T, R>),
713 }).instrument(current_span));
714 }
715 }
716 Event::ProxyMessage(PeerSinkMessage::Notification {
717 notification,
718 responder,
719 }) => {
720 let mut cancellation_param = None;
722 let notification = match notification.try_into() {
723 Ok::<CancelledNotification, _>(cancelled) => {
724 cancellation_param.replace(cancelled.params.clone());
725 cancelled.into()
726 }
727 Err(notification) => notification,
728 };
729 let send = transport.send(JsonRpcMessage::notification(notification));
730 let current_span = tracing::Span::current();
731 send_task_set.spawn(send.map(move |result| SendTaskResult::Notification {
732 responder,
733 cancellation_param,
734 result: result.map_err(DynamicTransportError::new::<T, R>),
735 }).instrument(current_span));
736 }
737 Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
738 id,
739 mut request,
740 ..
741 })) => {
742 tracing::debug!(%id, ?request, "received request");
743 {
744 let service = shared_service.clone();
745 let sink = sink_proxy_tx.clone();
746 let request_ct = serve_loop_ct.child_token();
747 let context_ct = request_ct.child_token();
748 local_ct_pool.insert(id.clone(), request_ct);
749 let mut extensions = Extensions::new();
750 let mut meta = Meta::new();
751 std::mem::swap(&mut meta, request.get_meta_mut());
754 std::mem::swap(&mut extensions, request.extensions_mut());
755 let context = RequestContext {
756 ct: context_ct,
757 id: id.clone(),
758 peer: peer.clone(),
759 meta,
760 extensions,
761 };
762 let current_span = tracing::Span::current();
763 tokio::spawn(async move {
764 let result = service
765 .handle_request(request, context)
766 .await;
767 let response = match result {
768 Ok(result) => {
769 tracing::debug!(%id, ?result, "response message");
770 JsonRpcMessage::response(result, id)
771 }
772 Err(error) => {
773 tracing::warn!(%id, ?error, "response error");
774 JsonRpcMessage::error(error, id)
775 }
776 };
777 let _send_result = sink.send(response).await;
778 }.instrument(current_span));
779 }
780 }
781 Event::PeerMessage(JsonRpcMessage::Notification(JsonRpcNotification {
782 notification,
783 ..
784 })) => {
785 tracing::info!(?notification, "received notification");
786 let mut notification = match notification.try_into() {
788 Ok::<CancelledNotification, _>(cancelled) => {
789 if let Some(ct) = local_ct_pool.remove(&cancelled.params.request_id) {
790 tracing::info!(id = %cancelled.params.request_id, reason = cancelled.params.reason, "cancelled");
791 ct.cancel();
792 }
793 cancelled.into()
794 }
795 Err(notification) => notification,
796 };
797 {
798 let service = shared_service.clone();
799 let mut extensions = Extensions::new();
800 let mut meta = Meta::new();
801 std::mem::swap(&mut extensions, notification.extensions_mut());
803 std::mem::swap(&mut meta, notification.get_meta_mut());
804 let context = NotificationContext {
805 peer: peer.clone(),
806 meta,
807 extensions,
808 };
809 let current_span = tracing::Span::current();
810 tokio::spawn(async move {
811 let result = service.handle_notification(notification, context).await;
812 if let Err(error) = result {
813 tracing::warn!(%error, "Error sending notification");
814 }
815 }.instrument(current_span));
816 }
817 }
818 Event::PeerMessage(JsonRpcMessage::Response(JsonRpcResponse {
819 result,
820 id,
821 ..
822 })) => {
823 if let Some(responder) = local_responder_pool.remove(&id) {
824 let response_result = responder.send(Ok(result));
825 if let Err(_error) = response_result {
826 tracing::warn!(%id, "Error sending response");
827 }
828 }
829 }
830 Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
831 if let Some(responder) = local_responder_pool.remove(&id) {
832 let _response_result = responder.send(Err(ServiceError::McpError(error)));
833 if let Err(_error) = _response_result {
834 tracing::warn!(%id, "Error sending response");
835 }
836 }
837 }
838 }
839 };
840 let sink_close_result = transport.close().await;
841 if let Err(e) = sink_close_result {
842 tracing::error!(%e, "fail to close sink");
843 }
844 tracing::info!(?quit_reason, "serve finished");
845 quit_reason
846 }.instrument(current_span));
847 RunningService {
848 service,
849 peer: peer_return,
850 handle,
851 cancellation_token: ct.clone(),
852 dg: ct.drop_guard(),
853 }
854}