use std::{borrow::Cow, sync::Arc};
use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage};
pub mod sink_stream;
#[cfg(feature = "transport-async-rw")]
pub mod async_rw;
#[cfg(feature = "transport-worker")]
pub mod worker;
#[cfg(feature = "transport-worker")]
pub use worker::WorkerTransport;
#[cfg(feature = "transport-child-process")]
pub mod child_process;
#[cfg(feature = "which-command")]
pub use child_process::which_command;
#[cfg(feature = "transport-child-process")]
pub use child_process::{ConfigureCommandExt, TokioChildProcess};
#[cfg(feature = "transport-io")]
pub mod io;
#[cfg(feature = "transport-io")]
pub use io::stdio;
#[cfg(feature = "auth")]
pub mod auth;
#[cfg(feature = "auth-client-credentials-jwt")]
pub use auth::JwtSigningAlgorithm;
#[cfg(feature = "auth")]
pub use auth::{
AuthClient, AuthError, AuthorizationManager, AuthorizationSession, AuthorizedHttpClient,
ClientCredentialsConfig, CredentialStore, EXTENSION_OAUTH_CLIENT_CREDENTIALS,
InMemoryCredentialStore, InMemoryStateStore, ScopeUpgradeConfig, StateStore,
StoredAuthorizationState, StoredCredentials, WWWAuthenticateParams,
};
#[cfg(feature = "transport-streamable-http-server-session")]
pub mod streamable_http_server;
#[cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))]
pub use streamable_http_server::tower::{StreamableHttpServerConfig, StreamableHttpService};
#[cfg(feature = "transport-streamable-http-client")]
pub mod streamable_http_client;
#[cfg(all(unix, feature = "transport-streamable-http-client-unix-socket"))]
pub use common::unix_socket::UnixSocketHttpClient;
#[cfg(feature = "transport-streamable-http-client")]
pub use streamable_http_client::StreamableHttpClientTransport;
pub mod common;
pub trait Transport<R>: Send
where
R: ServiceRole,
{
type Error: std::error::Error + Send + Sync + 'static;
fn name() -> Cow<'static, str> {
std::any::type_name::<Self>().into()
}
fn send(
&mut self,
item: TxJsonRpcMessage<R>,
) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static;
fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<R>>> + Send;
fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send;
}
pub trait IntoTransport<R, E, A>: Send + 'static
where
R: ServiceRole,
E: std::error::Error + Send + 'static,
{
fn into_transport(self) -> impl Transport<R, Error = E> + 'static;
}
#[non_exhaustive]
pub enum TransportAdapterIdentity {}
impl<R, T, E> IntoTransport<R, E, TransportAdapterIdentity> for T
where
T: Transport<R, Error = E> + Send + 'static,
R: ServiceRole,
E: std::error::Error + Send + Sync + 'static,
{
fn into_transport(self) -> impl Transport<R, Error = E> + 'static {
self
}
}
pub struct OneshotTransport<R>
where
R: ServiceRole,
{
message: Option<RxJsonRpcMessage<R>>,
sender: tokio::sync::mpsc::Sender<TxJsonRpcMessage<R>>,
termination: Arc<tokio::sync::Semaphore>,
}
impl<R> OneshotTransport<R>
where
R: ServiceRole,
{
pub fn new(
message: RxJsonRpcMessage<R>,
) -> (Self, tokio::sync::mpsc::Receiver<TxJsonRpcMessage<R>>) {
let (sender, receiver) = tokio::sync::mpsc::channel(16);
(
Self {
message: Some(message),
sender,
termination: Arc::new(tokio::sync::Semaphore::new(0)),
},
receiver,
)
}
}
impl<R> Transport<R> for OneshotTransport<R>
where
R: ServiceRole,
{
type Error = tokio::sync::mpsc::error::SendError<TxJsonRpcMessage<R>>;
fn send(
&mut self,
item: TxJsonRpcMessage<R>,
) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
let sender = self.sender.clone();
let terminate = matches!(item, TxJsonRpcMessage::<R>::Response(_))
|| matches!(item, TxJsonRpcMessage::<R>::Error(_));
let termination = self.termination.clone();
async move {
sender.send(item).await?;
if terminate {
termination.add_permits(1);
}
Ok(())
}
}
async fn receive(&mut self) -> Option<RxJsonRpcMessage<R>> {
if let Some(msg) = self.message.take() {
return Some(msg);
}
let _ = self.termination.acquire().await;
None
}
fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send {
self.message.take();
std::future::ready(Ok(()))
}
}
#[derive(Debug, thiserror::Error)]
#[error("Transport [{transport_name}] error: {error}")]
#[non_exhaustive]
pub struct DynamicTransportError {
pub transport_name: Cow<'static, str>,
pub transport_type_id: std::any::TypeId,
#[source]
pub error: Box<dyn std::error::Error + Send + Sync>,
}
impl DynamicTransportError {
pub fn new<T: Transport<R> + 'static, R: ServiceRole>(e: T::Error) -> Self {
Self {
transport_name: T::name(),
transport_type_id: std::any::TypeId::of::<T>(),
error: Box::new(e),
}
}
pub fn from_parts(
transport_name: impl Into<Cow<'static, str>>,
transport_type_id: std::any::TypeId,
error: Box<dyn std::error::Error + Send + Sync>,
) -> Self {
Self {
transport_name: transport_name.into(),
transport_type_id,
error,
}
}
pub fn downcast<T: Transport<R> + 'static, R: ServiceRole>(self) -> Result<T::Error, Self> {
if !self.is::<T, R>() {
Err(self)
} else {
Ok(self
.error
.downcast::<T::Error>()
.map(|e| *e)
.expect("type is checked"))
}
}
pub fn is<T: Transport<R> + 'static, R: ServiceRole>(&self) -> bool {
self.error.is::<T::Error>() && self.transport_type_id == std::any::TypeId::of::<T>()
}
}