Skip to main content

rmcp_soddygo/
transport.rs

1//! # Transport
2//! The transport type must implemented [`Transport`] trait, which allow it send message concurrently and receive message sequentially.
3//!
4//! ## Standard Transport Types
5//! There are 2 pairs of standard transport types:
6//!
7//! | transport         | client                                                    | server                                                |
8//! |:-:                |:-:                                                        |:-:                                                    |
9//! | std IO            | [`child_process::TokioChildProcess`]                      | [`io::stdio`]                                         |
10//! | streamable http   | [`streamable_http_client::StreamableHttpClientTransport`] | `streamable_http_server::StreamableHttpService`     |
11//!
12//!## Helper Transport Types
13//! Thers are several helper transport types that can help you to create transport quickly.
14//!
15//! ### [Worker Transport](`worker::WorkerTransport`)
16//! Which allows you to run a worker and process messages in another tokio task.
17//!
18//! ### [Async Read/Write Transport](`async_rw::AsyncRwTransport`)
19//! You need to enable `transport-async-rw` feature to use this transport.
20//!
21//! This transport is used to create a transport from a byte stream which implemented [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`].
22//!
23//! This could be very helpful when you want to create a transport from a byte stream, such as a file or a tcp connection.
24//!
25//! ### [Sink/Stream Transport](`sink_stream::SinkStreamTransport`)
26//! This transport is used to create a transport from a sink and a stream.
27//!
28//! This could be very helpful when you want to create a transport from a duplex object stream, such as a websocket connection.
29//!
30//! ## [IntoTransport](`IntoTransport`) trait
31//! [`IntoTransport`] is a helper trait that implicitly convert a type into a transport type.
32//!
33//! ### These types is automatically implemented [`IntoTransport`] trait
34//! 1. A type that already implement both [`futures::Sink`] and [`futures::Stream`] trait, or a tuple `(Tx, Rx)`  where `Tx` is [`futures::Sink`] and `Rx` is [`futures::Stream`].
35//! 2. A type that implement both [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`] trait. or a tuple `(R, W)` where `R` is [`tokio::io::AsyncRead`] and `W` is [`tokio::io::AsyncWrite`].
36//! 3. A type that implement [Worker](`worker::Worker`) trait.
37//! 4. A type that implement [`Transport`] trait.
38//!
39//! ## Examples
40//!
41//! ```rust
42//! # use rmcp::{
43//! #     ServiceExt, serve_server,
44//! # };
45//! #[cfg(feature = "client")]
46//! # use rmcp::serve_client;
47//!
48//! // create transport from tcp stream
49//! #[cfg(feature = "client")]
50//! async fn client() -> Result<(), Box<dyn std::error::Error>> {
51//!     let stream = tokio::net::TcpSocket::new_v4()?
52//!         .connect("127.0.0.1:8001".parse()?)
53//!         .await?;
54//!     let client = ().serve(stream).await?;
55//!     let tools = client.peer().list_tools(Default::default()).await?;
56//!     println!("{:?}", tools);
57//!     Ok(())
58//! }
59//!
60//! // create transport from std io
61//! #[cfg(feature = "client")]
62//! async fn io()  -> Result<(), Box<dyn std::error::Error>> {
63//!     let client = ().serve((tokio::io::stdin(), tokio::io::stdout())).await?;
64//!     let tools = client.peer().list_tools(Default::default()).await?;
65//!     println!("{:?}", tools);
66//!     Ok(())
67//! }
68//! ```
69
70use std::{borrow::Cow, sync::Arc};
71
72use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage};
73
74pub mod sink_stream;
75
76#[cfg(feature = "transport-async-rw")]
77pub mod async_rw;
78
79#[cfg(feature = "transport-worker")]
80pub mod worker;
81#[cfg(feature = "transport-worker")]
82pub use worker::WorkerTransport;
83
84#[cfg(feature = "transport-child-process")]
85pub mod child_process;
86#[cfg(feature = "which-command")]
87pub use child_process::which_command;
88#[cfg(feature = "transport-child-process")]
89pub use child_process::{ConfigureCommandExt, TokioChildProcess};
90
91#[cfg(feature = "transport-io")]
92pub mod io;
93#[cfg(feature = "transport-io")]
94pub use io::stdio;
95
96#[cfg(feature = "auth")]
97pub mod auth;
98#[cfg(feature = "auth-client-credentials-jwt")]
99pub use auth::JwtSigningAlgorithm;
100#[cfg(feature = "auth")]
101pub use auth::{
102    AuthClient, AuthError, AuthorizationManager, AuthorizationSession, AuthorizedHttpClient,
103    ClientCredentialsConfig, CredentialStore, EXTENSION_OAUTH_CLIENT_CREDENTIALS,
104    InMemoryCredentialStore, InMemoryStateStore, ScopeUpgradeConfig, StateStore,
105    StoredAuthorizationState, StoredCredentials, WWWAuthenticateParams,
106};
107
108// #[cfg(feature = "transport-ws")]
109// pub mod ws;
110#[cfg(feature = "transport-streamable-http-server-session")]
111pub mod streamable_http_server;
112#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))]
113pub use streamable_http_server::tower::{StreamableHttpServerConfig, StreamableHttpService};
114
115#[cfg(feature = "transport-streamable-http-client")]
116pub mod streamable_http_client;
117#[cfg(all(unix, feature = "transport-streamable-http-client-unix-socket"))]
118pub use common::unix_socket::UnixSocketHttpClient;
119#[cfg(feature = "transport-streamable-http-client")]
120pub use streamable_http_client::StreamableHttpClientTransport;
121
122/// Common use codes
123pub mod common;
124
125pub trait Transport<R>: Send
126where
127    R: ServiceRole,
128{
129    type Error: std::error::Error + Send + Sync + 'static;
130    fn name() -> Cow<'static, str> {
131        std::any::type_name::<Self>().into()
132    }
133    /// Send a message to the transport
134    ///
135    /// Notice that the future returned by this function should be `Send` and `'static`.
136    /// It's because the sending message could be executed concurrently.
137    ///
138    fn send(
139        &mut self,
140        item: TxJsonRpcMessage<R>,
141    ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static;
142
143    /// Receive a message from the transport, this operation is sequential.
144    fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<R>>> + Send;
145
146    /// Close the transport
147    fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send;
148}
149
150pub trait IntoTransport<R, E, A>: Send + 'static
151where
152    R: ServiceRole,
153    E: std::error::Error + Send + 'static,
154{
155    fn into_transport(self) -> impl Transport<R, Error = E> + 'static;
156}
157
158#[non_exhaustive]
159pub enum TransportAdapterIdentity {}
160impl<R, T, E> IntoTransport<R, E, TransportAdapterIdentity> for T
161where
162    T: Transport<R, Error = E> + Send + 'static,
163    R: ServiceRole,
164    E: std::error::Error + Send + Sync + 'static,
165{
166    fn into_transport(self) -> impl Transport<R, Error = E> + 'static {
167        self
168    }
169}
170
171/// A transport that can send a single message and then close itself
172pub struct OneshotTransport<R>
173where
174    R: ServiceRole,
175{
176    message: Option<RxJsonRpcMessage<R>>,
177    sender: tokio::sync::mpsc::Sender<TxJsonRpcMessage<R>>,
178    termination: Arc<tokio::sync::Semaphore>,
179}
180
181impl<R> OneshotTransport<R>
182where
183    R: ServiceRole,
184{
185    pub fn new(
186        message: RxJsonRpcMessage<R>,
187    ) -> (Self, tokio::sync::mpsc::Receiver<TxJsonRpcMessage<R>>) {
188        let (sender, receiver) = tokio::sync::mpsc::channel(16);
189        (
190            Self {
191                message: Some(message),
192                sender,
193                termination: Arc::new(tokio::sync::Semaphore::new(0)),
194            },
195            receiver,
196        )
197    }
198}
199
200impl<R> Transport<R> for OneshotTransport<R>
201where
202    R: ServiceRole,
203{
204    type Error = tokio::sync::mpsc::error::SendError<TxJsonRpcMessage<R>>;
205
206    fn send(
207        &mut self,
208        item: TxJsonRpcMessage<R>,
209    ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
210        let sender = self.sender.clone();
211        let terminate = matches!(item, TxJsonRpcMessage::<R>::Response(_))
212            || matches!(item, TxJsonRpcMessage::<R>::Error(_));
213        let termination = self.termination.clone();
214        async move {
215            sender.send(item).await?;
216            if terminate {
217                termination.add_permits(1);
218            }
219            Ok(())
220        }
221    }
222
223    async fn receive(&mut self) -> Option<RxJsonRpcMessage<R>> {
224        if let Some(msg) = self.message.take() {
225            return Some(msg);
226        }
227        let _ = self.termination.acquire().await;
228        None
229    }
230
231    fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send {
232        self.message.take();
233        std::future::ready(Ok(()))
234    }
235}
236
237#[derive(Debug, thiserror::Error)]
238#[error("Transport [{transport_name}] error: {error}")]
239#[non_exhaustive]
240pub struct DynamicTransportError {
241    pub transport_name: Cow<'static, str>,
242    pub transport_type_id: std::any::TypeId,
243    #[source]
244    pub error: Box<dyn std::error::Error + Send + Sync>,
245}
246
247impl DynamicTransportError {
248    pub fn new<T: Transport<R> + 'static, R: ServiceRole>(e: T::Error) -> Self {
249        Self {
250            transport_name: T::name(),
251            transport_type_id: std::any::TypeId::of::<T>(),
252            error: Box::new(e),
253        }
254    }
255
256    /// Create a `DynamicTransportError` from raw parts.
257    ///
258    /// Unlike [`new`](Self::new), this does not require a concrete [`Transport`] type,
259    /// making it usable in test fixtures and other contexts where a real transport
260    /// implementation is not available.
261    pub fn from_parts(
262        transport_name: impl Into<Cow<'static, str>>,
263        transport_type_id: std::any::TypeId,
264        error: Box<dyn std::error::Error + Send + Sync>,
265    ) -> Self {
266        Self {
267            transport_name: transport_name.into(),
268            transport_type_id,
269            error,
270        }
271    }
272
273    pub fn downcast<T: Transport<R> + 'static, R: ServiceRole>(self) -> Result<T::Error, Self> {
274        if !self.is::<T, R>() {
275            Err(self)
276        } else {
277            Ok(self
278                .error
279                .downcast::<T::Error>()
280                .map(|e| *e)
281                .expect("type is checked"))
282        }
283    }
284    pub fn is<T: Transport<R> + 'static, R: ServiceRole>(&self) -> bool {
285        self.error.is::<T::Error>() && self.transport_type_id == std::any::TypeId::of::<T>()
286    }
287}