rmcp_soddygo/
transport.rs1use std::{borrow::Cow, sync::Arc};
71
72use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage};
73
74pub mod sink_stream;
75
76#[cfg(feature = "transport-async-rw")]
77pub mod async_rw;
78
79#[cfg(feature = "transport-worker")]
80pub mod worker;
81#[cfg(feature = "transport-worker")]
82pub use worker::WorkerTransport;
83
84#[cfg(feature = "transport-child-process")]
85pub mod child_process;
86#[cfg(feature = "transport-child-process")]
87pub use child_process::{ConfigureCommandExt, TokioChildProcess};
88
89#[cfg(feature = "transport-io")]
90pub mod io;
91#[cfg(feature = "transport-io")]
92pub use io::stdio;
93
94#[cfg(feature = "auth")]
95pub mod auth;
96#[cfg(feature = "auth-client-credentials-jwt")]
97pub use auth::JwtSigningAlgorithm;
98#[cfg(feature = "auth")]
99pub use auth::{
100 AuthClient, AuthError, AuthorizationManager, AuthorizationSession, AuthorizedHttpClient,
101 ClientCredentialsConfig, CredentialStore, EXTENSION_OAUTH_CLIENT_CREDENTIALS,
102 InMemoryCredentialStore, InMemoryStateStore, ScopeUpgradeConfig, StateStore,
103 StoredAuthorizationState, StoredCredentials, WWWAuthenticateParams,
104};
105
106#[cfg(feature = "transport-streamable-http-server-session")]
109pub mod streamable_http_server;
110#[cfg(feature = "transport-streamable-http-server")]
111pub use streamable_http_server::tower::{StreamableHttpServerConfig, StreamableHttpService};
112
113#[cfg(feature = "transport-streamable-http-client")]
114pub mod streamable_http_client;
115#[cfg(feature = "transport-streamable-http-client")]
116pub use streamable_http_client::StreamableHttpClientTransport;
117
118pub mod common;
120
121pub trait Transport<R>: Send
122where
123 R: ServiceRole,
124{
125 type Error: std::error::Error + Send + Sync + 'static;
126 fn name() -> Cow<'static, str> {
127 std::any::type_name::<Self>().into()
128 }
129 fn send(
135 &mut self,
136 item: TxJsonRpcMessage<R>,
137 ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static;
138
139 fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<R>>> + Send;
141
142 fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send;
144}
145
146pub trait IntoTransport<R, E, A>: Send + 'static
147where
148 R: ServiceRole,
149 E: std::error::Error + Send + 'static,
150{
151 fn into_transport(self) -> impl Transport<R, Error = E> + 'static;
152}
153
154pub enum TransportAdapterIdentity {}
155impl<R, T, E> IntoTransport<R, E, TransportAdapterIdentity> for T
156where
157 T: Transport<R, Error = E> + Send + 'static,
158 R: ServiceRole,
159 E: std::error::Error + Send + Sync + 'static,
160{
161 fn into_transport(self) -> impl Transport<R, Error = E> + 'static {
162 self
163 }
164}
165
166pub struct OneshotTransport<R>
168where
169 R: ServiceRole,
170{
171 message: Option<RxJsonRpcMessage<R>>,
172 sender: tokio::sync::mpsc::Sender<TxJsonRpcMessage<R>>,
173 termination: Arc<tokio::sync::Semaphore>,
174}
175
176impl<R> OneshotTransport<R>
177where
178 R: ServiceRole,
179{
180 pub fn new(
181 message: RxJsonRpcMessage<R>,
182 ) -> (Self, tokio::sync::mpsc::Receiver<TxJsonRpcMessage<R>>) {
183 let (sender, receiver) = tokio::sync::mpsc::channel(16);
184 (
185 Self {
186 message: Some(message),
187 sender,
188 termination: Arc::new(tokio::sync::Semaphore::new(0)),
189 },
190 receiver,
191 )
192 }
193}
194
195impl<R> Transport<R> for OneshotTransport<R>
196where
197 R: ServiceRole,
198{
199 type Error = tokio::sync::mpsc::error::SendError<TxJsonRpcMessage<R>>;
200
201 fn send(
202 &mut self,
203 item: TxJsonRpcMessage<R>,
204 ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
205 let sender = self.sender.clone();
206 let terminate = matches!(item, TxJsonRpcMessage::<R>::Response(_))
207 || matches!(item, TxJsonRpcMessage::<R>::Error(_));
208 let termination = self.termination.clone();
209 async move {
210 sender.send(item).await?;
211 if terminate {
212 termination.add_permits(1);
213 }
214 Ok(())
215 }
216 }
217
218 async fn receive(&mut self) -> Option<RxJsonRpcMessage<R>> {
219 if let Some(msg) = self.message.take() {
220 return Some(msg);
221 }
222 let _ = self.termination.acquire().await;
223 None
224 }
225
226 fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send {
227 self.message.take();
228 std::future::ready(Ok(()))
229 }
230}
231
232#[derive(Debug, thiserror::Error)]
233#[error("Transport [{transport_name}] error: {error}")]
234pub struct DynamicTransportError {
235 pub transport_name: Cow<'static, str>,
236 pub transport_type_id: std::any::TypeId,
237 #[source]
238 pub error: Box<dyn std::error::Error + Send + Sync>,
239}
240
241impl DynamicTransportError {
242 pub fn new<T: Transport<R> + 'static, R: ServiceRole>(e: T::Error) -> Self {
243 Self {
244 transport_name: T::name(),
245 transport_type_id: std::any::TypeId::of::<T>(),
246 error: Box::new(e),
247 }
248 }
249 pub fn downcast<T: Transport<R> + 'static, R: ServiceRole>(self) -> Result<T::Error, Self> {
250 if !self.is::<T, R>() {
251 Err(self)
252 } else {
253 Ok(self
254 .error
255 .downcast::<T::Error>()
256 .map(|e| *e)
257 .expect("type is checked"))
258 }
259 }
260 pub fn is<T: Transport<R> + 'static, R: ServiceRole>(&self) -> bool {
261 self.error.is::<T::Error>() && self.transport_type_id == std::any::TypeId::of::<T>()
262 }
263}