rmcp_soddygo/
transport.rs1use std::{borrow::Cow, sync::Arc};
71
72use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage};
73
74pub mod sink_stream;
75
76#[cfg(feature = "transport-async-rw")]
77pub mod async_rw;
78
79#[cfg(feature = "transport-worker")]
80pub mod worker;
81#[cfg(feature = "transport-worker")]
82pub use worker::WorkerTransport;
83
84#[cfg(feature = "transport-child-process")]
85pub mod child_process;
86#[cfg(feature = "which-command")]
87pub use child_process::which_command;
88#[cfg(feature = "transport-child-process")]
89pub use child_process::{ConfigureCommandExt, TokioChildProcess};
90
91#[cfg(feature = "transport-io")]
92pub mod io;
93#[cfg(feature = "transport-io")]
94pub use io::stdio;
95
96#[cfg(feature = "auth")]
97pub mod auth;
98#[cfg(feature = "auth-client-credentials-jwt")]
99pub use auth::JwtSigningAlgorithm;
100#[cfg(feature = "auth")]
101pub use auth::{
102 AuthClient, AuthError, AuthorizationManager, AuthorizationSession, AuthorizedHttpClient,
103 ClientCredentialsConfig, CredentialStore, EXTENSION_OAUTH_CLIENT_CREDENTIALS,
104 InMemoryCredentialStore, InMemoryStateStore, ScopeUpgradeConfig, StateStore,
105 StoredAuthorizationState, StoredCredentials, WWWAuthenticateParams,
106};
107
108#[cfg(feature = "transport-streamable-http-server-session")]
111pub mod streamable_http_server;
112#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))]
113pub use streamable_http_server::tower::{StreamableHttpServerConfig, StreamableHttpService};
114
115#[cfg(feature = "transport-streamable-http-client")]
116pub mod streamable_http_client;
117#[cfg(all(unix, feature = "transport-streamable-http-client-unix-socket"))]
118pub use common::unix_socket::UnixSocketHttpClient;
119#[cfg(feature = "transport-streamable-http-client")]
120pub use streamable_http_client::StreamableHttpClientTransport;
121
122pub mod common;
124
125pub trait Transport<R>: Send
126where
127 R: ServiceRole,
128{
129 type Error: std::error::Error + Send + Sync + 'static;
130 fn name() -> Cow<'static, str> {
131 std::any::type_name::<Self>().into()
132 }
133 fn send(
139 &mut self,
140 item: TxJsonRpcMessage<R>,
141 ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static;
142
143 fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<R>>> + Send;
145
146 fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send;
148}
149
150pub trait IntoTransport<R, E, A>: Send + 'static
151where
152 R: ServiceRole,
153 E: std::error::Error + Send + 'static,
154{
155 fn into_transport(self) -> impl Transport<R, Error = E> + 'static;
156}
157
158#[non_exhaustive]
159pub enum TransportAdapterIdentity {}
160impl<R, T, E> IntoTransport<R, E, TransportAdapterIdentity> for T
161where
162 T: Transport<R, Error = E> + Send + 'static,
163 R: ServiceRole,
164 E: std::error::Error + Send + Sync + 'static,
165{
166 fn into_transport(self) -> impl Transport<R, Error = E> + 'static {
167 self
168 }
169}
170
171pub struct OneshotTransport<R>
173where
174 R: ServiceRole,
175{
176 message: Option<RxJsonRpcMessage<R>>,
177 sender: tokio::sync::mpsc::Sender<TxJsonRpcMessage<R>>,
178 termination: Arc<tokio::sync::Semaphore>,
179}
180
181impl<R> OneshotTransport<R>
182where
183 R: ServiceRole,
184{
185 pub fn new(
186 message: RxJsonRpcMessage<R>,
187 ) -> (Self, tokio::sync::mpsc::Receiver<TxJsonRpcMessage<R>>) {
188 let (sender, receiver) = tokio::sync::mpsc::channel(16);
189 (
190 Self {
191 message: Some(message),
192 sender,
193 termination: Arc::new(tokio::sync::Semaphore::new(0)),
194 },
195 receiver,
196 )
197 }
198}
199
200impl<R> Transport<R> for OneshotTransport<R>
201where
202 R: ServiceRole,
203{
204 type Error = tokio::sync::mpsc::error::SendError<TxJsonRpcMessage<R>>;
205
206 fn send(
207 &mut self,
208 item: TxJsonRpcMessage<R>,
209 ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
210 let sender = self.sender.clone();
211 let terminate = matches!(item, TxJsonRpcMessage::<R>::Response(_))
212 || matches!(item, TxJsonRpcMessage::<R>::Error(_));
213 let termination = self.termination.clone();
214 async move {
215 sender.send(item).await?;
216 if terminate {
217 termination.add_permits(1);
218 }
219 Ok(())
220 }
221 }
222
223 async fn receive(&mut self) -> Option<RxJsonRpcMessage<R>> {
224 if let Some(msg) = self.message.take() {
225 return Some(msg);
226 }
227 let _ = self.termination.acquire().await;
228 None
229 }
230
231 fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send {
232 self.message.take();
233 std::future::ready(Ok(()))
234 }
235}
236
237#[derive(Debug, thiserror::Error)]
238#[error("Transport [{transport_name}] error: {error}")]
239#[non_exhaustive]
240pub struct DynamicTransportError {
241 pub transport_name: Cow<'static, str>,
242 pub transport_type_id: std::any::TypeId,
243 #[source]
244 pub error: Box<dyn std::error::Error + Send + Sync>,
245}
246
247impl DynamicTransportError {
248 pub fn new<T: Transport<R> + 'static, R: ServiceRole>(e: T::Error) -> Self {
249 Self {
250 transport_name: T::name(),
251 transport_type_id: std::any::TypeId::of::<T>(),
252 error: Box::new(e),
253 }
254 }
255
256 pub fn from_parts(
262 transport_name: impl Into<Cow<'static, str>>,
263 transport_type_id: std::any::TypeId,
264 error: Box<dyn std::error::Error + Send + Sync>,
265 ) -> Self {
266 Self {
267 transport_name: transport_name.into(),
268 transport_type_id,
269 error,
270 }
271 }
272
273 pub fn downcast<T: Transport<R> + 'static, R: ServiceRole>(self) -> Result<T::Error, Self> {
274 if !self.is::<T, R>() {
275 Err(self)
276 } else {
277 Ok(self
278 .error
279 .downcast::<T::Error>()
280 .map(|e| *e)
281 .expect("type is checked"))
282 }
283 }
284 pub fn is<T: Transport<R> + 'static, R: ServiceRole>(&self) -> bool {
285 self.error.is::<T::Error>() && self.transport_type_id == std::any::TypeId::of::<T>()
286 }
287}