Skip to main content

rmcp_soddygo/
transport.rs

1//! # Transport
2//! The transport type must implemented [`Transport`] trait, which allow it send message concurrently and receive message sequentially.
3//!
4//! ## Standard Transport Types
5//! There are 2 pairs of standard transport types:
6//!
7//! | transport         | client                                                    | server                                                |
8//! |:-:                |:-:                                                        |:-:                                                    |
9//! | std IO            | [`child_process::TokioChildProcess`]                      | [`io::stdio`]                                         |
10//! | streamable http   | [`streamable_http_client::StreamableHttpClientTransport`] | [`streamable_http_server::StreamableHttpService`]     |
11//!
12//!## Helper Transport Types
13//! Thers are several helper transport types that can help you to create transport quickly.
14//!
15//! ### [Worker Transport](`worker::WorkerTransport`)
16//! Which allows you to run a worker and process messages in another tokio task.
17//!
18//! ### [Async Read/Write Transport](`async_rw::AsyncRwTransport`)
19//! You need to enable `transport-async-rw` feature to use this transport.
20//!
21//! This transport is used to create a transport from a byte stream which implemented [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`].
22//!
23//! This could be very helpful when you want to create a transport from a byte stream, such as a file or a tcp connection.
24//!
25//! ### [Sink/Stream Transport](`sink_stream::SinkStreamTransport`)
26//! This transport is used to create a transport from a sink and a stream.
27//!
28//! This could be very helpful when you want to create a transport from a duplex object stream, such as a websocket connection.
29//!
30//! ## [IntoTransport](`IntoTransport`) trait
31//! [`IntoTransport`] is a helper trait that implicitly convert a type into a transport type.
32//!
33//! ### These types is automatically implemented [`IntoTransport`] trait
34//! 1. A type that already implement both [`futures::Sink`] and [`futures::Stream`] trait, or a tuple `(Tx, Rx)`  where `Tx` is [`futures::Sink`] and `Rx` is [`futures::Stream`].
35//! 2. A type that implement both [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`] trait. or a tuple `(R, W)` where `R` is [`tokio::io::AsyncRead`] and `W` is [`tokio::io::AsyncWrite`].
36//! 3. A type that implement [Worker](`worker::Worker`) trait.
37//! 4. A type that implement [`Transport`] trait.
38//!
39//! ## Examples
40//!
41//! ```rust
42//! # use rmcp::{
43//! #     ServiceExt, serve_server,
44//! # };
45//! #[cfg(feature = "client")]
46//! # use rmcp::serve_client;
47//!
48//! // create transport from tcp stream
49//! #[cfg(feature = "client")]
50//! async fn client() -> Result<(), Box<dyn std::error::Error>> {
51//!     let stream = tokio::net::TcpSocket::new_v4()?
52//!         .connect("127.0.0.1:8001".parse()?)
53//!         .await?;
54//!     let client = ().serve(stream).await?;
55//!     let tools = client.peer().list_tools(Default::default()).await?;
56//!     println!("{:?}", tools);
57//!     Ok(())
58//! }
59//!
60//! // create transport from std io
61//! #[cfg(feature = "client")]
62//! async fn io()  -> Result<(), Box<dyn std::error::Error>> {
63//!     let client = ().serve((tokio::io::stdin(), tokio::io::stdout())).await?;
64//!     let tools = client.peer().list_tools(Default::default()).await?;
65//!     println!("{:?}", tools);
66//!     Ok(())
67//! }
68//! ```
69
70use std::{borrow::Cow, sync::Arc};
71
72use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage};
73
74pub mod sink_stream;
75
76#[cfg(feature = "transport-async-rw")]
77pub mod async_rw;
78
79#[cfg(feature = "transport-worker")]
80pub mod worker;
81#[cfg(feature = "transport-worker")]
82pub use worker::WorkerTransport;
83
84#[cfg(feature = "transport-child-process")]
85pub mod child_process;
86#[cfg(feature = "transport-child-process")]
87pub use child_process::{ConfigureCommandExt, TokioChildProcess};
88
89#[cfg(feature = "transport-io")]
90pub mod io;
91#[cfg(feature = "transport-io")]
92pub use io::stdio;
93
94#[cfg(feature = "auth")]
95pub mod auth;
96#[cfg(feature = "auth-client-credentials-jwt")]
97pub use auth::JwtSigningAlgorithm;
98#[cfg(feature = "auth")]
99pub use auth::{
100    AuthClient, AuthError, AuthorizationManager, AuthorizationSession, AuthorizedHttpClient,
101    ClientCredentialsConfig, CredentialStore, EXTENSION_OAUTH_CLIENT_CREDENTIALS,
102    InMemoryCredentialStore, InMemoryStateStore, ScopeUpgradeConfig, StateStore,
103    StoredAuthorizationState, StoredCredentials, WWWAuthenticateParams,
104};
105
106// #[cfg(feature = "transport-ws")]
107// pub mod ws;
108#[cfg(feature = "transport-streamable-http-server-session")]
109pub mod streamable_http_server;
110#[cfg(feature = "transport-streamable-http-server")]
111pub use streamable_http_server::tower::{StreamableHttpServerConfig, StreamableHttpService};
112
113#[cfg(feature = "transport-streamable-http-client")]
114pub mod streamable_http_client;
115#[cfg(feature = "transport-streamable-http-client")]
116pub use streamable_http_client::StreamableHttpClientTransport;
117
118/// Common use codes
119pub mod common;
120
121pub trait Transport<R>: Send
122where
123    R: ServiceRole,
124{
125    type Error: std::error::Error + Send + Sync + 'static;
126    fn name() -> Cow<'static, str> {
127        std::any::type_name::<Self>().into()
128    }
129    /// Send a message to the transport
130    ///
131    /// Notice that the future returned by this function should be `Send` and `'static`.
132    /// It's because the sending message could be executed concurrently.
133    ///
134    fn send(
135        &mut self,
136        item: TxJsonRpcMessage<R>,
137    ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static;
138
139    /// Receive a message from the transport, this operation is sequential.
140    fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<R>>> + Send;
141
142    /// Close the transport
143    fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send;
144}
145
146pub trait IntoTransport<R, E, A>: Send + 'static
147where
148    R: ServiceRole,
149    E: std::error::Error + Send + 'static,
150{
151    fn into_transport(self) -> impl Transport<R, Error = E> + 'static;
152}
153
154pub enum TransportAdapterIdentity {}
155impl<R, T, E> IntoTransport<R, E, TransportAdapterIdentity> for T
156where
157    T: Transport<R, Error = E> + Send + 'static,
158    R: ServiceRole,
159    E: std::error::Error + Send + Sync + 'static,
160{
161    fn into_transport(self) -> impl Transport<R, Error = E> + 'static {
162        self
163    }
164}
165
166/// A transport that can send a single message and then close itself
167pub struct OneshotTransport<R>
168where
169    R: ServiceRole,
170{
171    message: Option<RxJsonRpcMessage<R>>,
172    sender: tokio::sync::mpsc::Sender<TxJsonRpcMessage<R>>,
173    termination: Arc<tokio::sync::Semaphore>,
174}
175
176impl<R> OneshotTransport<R>
177where
178    R: ServiceRole,
179{
180    pub fn new(
181        message: RxJsonRpcMessage<R>,
182    ) -> (Self, tokio::sync::mpsc::Receiver<TxJsonRpcMessage<R>>) {
183        let (sender, receiver) = tokio::sync::mpsc::channel(16);
184        (
185            Self {
186                message: Some(message),
187                sender,
188                termination: Arc::new(tokio::sync::Semaphore::new(0)),
189            },
190            receiver,
191        )
192    }
193}
194
195impl<R> Transport<R> for OneshotTransport<R>
196where
197    R: ServiceRole,
198{
199    type Error = tokio::sync::mpsc::error::SendError<TxJsonRpcMessage<R>>;
200
201    fn send(
202        &mut self,
203        item: TxJsonRpcMessage<R>,
204    ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
205        let sender = self.sender.clone();
206        let terminate = matches!(item, TxJsonRpcMessage::<R>::Response(_))
207            || matches!(item, TxJsonRpcMessage::<R>::Error(_));
208        let termination = self.termination.clone();
209        async move {
210            sender.send(item).await?;
211            if terminate {
212                termination.add_permits(1);
213            }
214            Ok(())
215        }
216    }
217
218    async fn receive(&mut self) -> Option<RxJsonRpcMessage<R>> {
219        if let Some(msg) = self.message.take() {
220            return Some(msg);
221        }
222        let _ = self.termination.acquire().await;
223        None
224    }
225
226    fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send {
227        self.message.take();
228        std::future::ready(Ok(()))
229    }
230}
231
232#[derive(Debug, thiserror::Error)]
233#[error("Transport [{transport_name}] error: {error}")]
234pub struct DynamicTransportError {
235    pub transport_name: Cow<'static, str>,
236    pub transport_type_id: std::any::TypeId,
237    #[source]
238    pub error: Box<dyn std::error::Error + Send + Sync>,
239}
240
241impl DynamicTransportError {
242    pub fn new<T: Transport<R> + 'static, R: ServiceRole>(e: T::Error) -> Self {
243        Self {
244            transport_name: T::name(),
245            transport_type_id: std::any::TypeId::of::<T>(),
246            error: Box::new(e),
247        }
248    }
249    pub fn downcast<T: Transport<R> + 'static, R: ServiceRole>(self) -> Result<T::Error, Self> {
250        if !self.is::<T, R>() {
251            Err(self)
252        } else {
253            Ok(self
254                .error
255                .downcast::<T::Error>()
256                .map(|e| *e)
257                .expect("type is checked"))
258        }
259    }
260    pub fn is<T: Transport<R> + 'static, R: ServiceRole>(&self) -> bool {
261        self.error.is::<T::Error>() && self.transport_type_id == std::any::TypeId::of::<T>()
262    }
263}