use std::{borrow::Cow, collections::HashMap, sync::Arc, time::Duration};
use futures::{Stream, StreamExt, future::BoxFuture, stream::BoxStream};
use http::{HeaderName, HeaderValue};
pub use sse_stream::Error as SseError;
use sse_stream::Sse;
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use super::common::client_side_sse::{ExponentialBackoff, SseRetryPolicy, SseStreamReconnect};
use crate::{
RoleClient,
model::{
ClientJsonRpcMessage, ClientNotification, InitializedNotification, ServerJsonRpcMessage,
ServerResult,
},
transport::{
common::client_side_sse::SseAutoReconnectStream,
worker::{Worker, WorkerQuitReason, WorkerSendRequest, WorkerTransport},
},
};
type BoxedSseStream = BoxStream<'static, Result<Sse, SseError>>;
#[derive(Debug)]
#[non_exhaustive]
pub struct AuthRequiredError {
pub www_authenticate_header: String,
}
impl AuthRequiredError {
pub fn new(www_authenticate_header: String) -> Self {
Self {
www_authenticate_header,
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct InsufficientScopeError {
pub www_authenticate_header: String,
pub required_scope: Option<String>,
}
impl InsufficientScopeError {
pub fn new(www_authenticate_header: String, required_scope: Option<String>) -> Self {
Self {
www_authenticate_header,
required_scope,
}
}
pub fn can_upgrade(&self) -> bool {
self.required_scope.is_some()
}
pub fn get_required_scope(&self) -> Option<&str> {
self.required_scope.as_deref()
}
}
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum StreamableHttpError<E: std::error::Error + Send + Sync + 'static> {
#[error("SSE error: {0}")]
Sse(#[from] SseError),
#[error("Io error: {0}")]
Io(#[from] std::io::Error),
#[error("Client error: {0}")]
Client(E),
#[error("unexpected end of stream")]
UnexpectedEndOfStream,
#[error("unexpected server response: {0}")]
UnexpectedServerResponse(Cow<'static, str>),
#[error("Unexpected content type: {0:?}")]
UnexpectedContentType(Option<String>),
#[error("Server does not support SSE")]
ServerDoesNotSupportSse,
#[error("Server does not support delete session")]
ServerDoesNotSupportDeleteSession,
#[error("Tokio join error: {0}")]
TokioJoinError(#[from] tokio::task::JoinError),
#[error("Deserialize error: {0}")]
Deserialize(#[from] serde_json::Error),
#[error("Transport channel closed")]
TransportChannelClosed,
#[error("Missing session id in HTTP response")]
MissingSessionIdInResponse,
#[cfg(feature = "auth")]
#[error("Auth error: {0}")]
Auth(#[from] crate::transport::auth::AuthError),
#[error("Auth required")]
AuthRequired(AuthRequiredError),
#[error("Insufficient scope")]
InsufficientScope(InsufficientScopeError),
#[error("Header name '{0}' is reserved and conflicts with default headers")]
ReservedHeaderConflict(String),
#[error("Session expired (HTTP 404)")]
SessionExpired,
}
#[derive(Debug, Clone, Error)]
#[non_exhaustive]
pub enum StreamableHttpProtocolError {
#[error("Missing session id in response")]
MissingSessionIdInResponse,
}
#[allow(clippy::large_enum_variant)]
#[non_exhaustive]
pub enum StreamableHttpPostResponse {
Accepted,
Json(ServerJsonRpcMessage, Option<String>),
Sse(BoxedSseStream, Option<String>),
}
impl std::fmt::Debug for StreamableHttpPostResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Accepted => write!(f, "Accepted"),
Self::Json(arg0, arg1) => f.debug_tuple("Json").field(arg0).field(arg1).finish(),
Self::Sse(_, arg1) => f.debug_tuple("Sse").field(arg1).finish(),
}
}
}
impl StreamableHttpPostResponse {
pub async fn expect_initialized<E>(
self,
) -> Result<(ServerJsonRpcMessage, Option<String>), StreamableHttpError<E>>
where
E: std::error::Error + Send + Sync + 'static,
{
match self {
Self::Json(message, session_id) => Ok((message, session_id)),
Self::Sse(mut stream, session_id) => {
while let Some(event) = stream.next().await {
let event = event?;
let payload = event.data.unwrap_or_default();
if payload.trim().is_empty() {
continue;
}
let message: ServerJsonRpcMessage = serde_json::from_str(&payload)?;
if matches!(message, ServerJsonRpcMessage::Response(_)) {
return Ok((message, session_id));
}
debug!(
?message,
"received message before initialize response; continuing to drain stream"
);
}
Err(StreamableHttpError::UnexpectedServerResponse(
"empty sse stream".into(),
))
}
_ => Err(StreamableHttpError::UnexpectedServerResponse(
"expect initialized, accepted".into(),
)),
}
}
pub fn expect_json<E>(self) -> Result<ServerJsonRpcMessage, StreamableHttpError<E>>
where
E: std::error::Error + Send + Sync + 'static,
{
match self {
Self::Json(message, ..) => Ok(message),
got => Err(StreamableHttpError::UnexpectedServerResponse(
format!("expect json, got {got:?}").into(),
)),
}
}
pub fn expect_accepted_or_json<E>(self) -> Result<(), StreamableHttpError<E>>
where
E: std::error::Error + Send + Sync + 'static,
{
match self {
Self::Accepted => Ok(()),
Self::Json(..) => Ok(()),
got => Err(StreamableHttpError::UnexpectedServerResponse(
format!("expect accepted or json, got {got:?}").into(),
)),
}
}
}
pub trait StreamableHttpClient: Clone + Send + 'static {
type Error: std::error::Error + Send + Sync + 'static;
fn post_message(
&self,
uri: Arc<str>,
message: ClientJsonRpcMessage,
session_id: Option<Arc<str>>,
auth_header: Option<String>,
custom_headers: HashMap<HeaderName, HeaderValue>,
) -> impl Future<Output = Result<StreamableHttpPostResponse, StreamableHttpError<Self::Error>>>
+ Send
+ '_;
fn delete_session(
&self,
uri: Arc<str>,
session_id: Arc<str>,
auth_header: Option<String>,
custom_headers: HashMap<HeaderName, HeaderValue>,
) -> impl Future<Output = Result<(), StreamableHttpError<Self::Error>>> + Send + '_;
fn get_stream(
&self,
uri: Arc<str>,
session_id: Arc<str>,
last_event_id: Option<String>,
auth_header: Option<String>,
custom_headers: HashMap<HeaderName, HeaderValue>,
) -> impl Future<
Output = Result<
BoxStream<'static, Result<Sse, SseError>>,
StreamableHttpError<Self::Error>,
>,
> + Send
+ '_;
}
#[non_exhaustive]
pub struct RetryConfig {
pub max_times: Option<usize>,
pub min_duration: Duration,
}
struct StreamableHttpClientReconnect<C> {
pub client: C,
pub session_id: Arc<str>,
pub uri: Arc<str>,
pub auth_header: Option<String>,
pub custom_headers: HashMap<HeaderName, HeaderValue>,
}
impl<C: StreamableHttpClient> SseStreamReconnect for StreamableHttpClientReconnect<C> {
type Error = StreamableHttpError<C::Error>;
type Future = BoxFuture<'static, Result<BoxedSseStream, Self::Error>>;
fn retry_connection(&mut self, last_event_id: Option<&str>) -> Self::Future {
let client = self.client.clone();
let uri = self.uri.clone();
let session_id = self.session_id.clone();
let auth_header = self.auth_header.clone();
let custom_headers = self.custom_headers.clone();
let last_event_id = last_event_id.map(|s| s.to_owned());
Box::pin(async move {
client
.get_stream(uri, session_id, last_event_id, auth_header, custom_headers)
.await
})
}
}
struct SessionCleanupInfo<C> {
client: C,
uri: Arc<str>,
session_id: Arc<str>,
auth_header: Option<String>,
protocol_headers: HashMap<HeaderName, HeaderValue>,
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct StreamableHttpClientWorker<C: StreamableHttpClient> {
pub client: C,
pub config: StreamableHttpClientTransportConfig,
}
impl<C: StreamableHttpClient + Default> StreamableHttpClientWorker<C> {
pub fn new_simple(url: impl Into<Arc<str>>) -> Self {
Self {
client: C::default(),
config: StreamableHttpClientTransportConfig {
uri: url.into(),
..Default::default()
},
}
}
}
impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
pub fn new(client: C, config: StreamableHttpClientTransportConfig) -> Self {
Self { client, config }
}
}
impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
fn raw_sse_to_jsonrpc(
stream: BoxedSseStream,
) -> impl Stream<Item = Result<ServerJsonRpcMessage, StreamableHttpError<C::Error>>> + Send + 'static
{
stream.filter_map(|event| async {
match event {
Err(e) => Some(Err(StreamableHttpError::Sse(e))),
Ok(sse) => {
let is_message =
matches!(sse.event.as_deref(), None | Some("") | Some("message"));
if !is_message {
return None;
}
let data = sse.data?;
if data.trim().is_empty() {
return None;
}
match serde_json::from_str::<ServerJsonRpcMessage>(&data) {
Ok(msg) => Some(Ok(msg)),
Err(e) => {
tracing::debug!("failed to deserialize server message: {e}");
None
}
}
}
}
})
}
async fn execute_sse_stream(
sse_stream: impl Stream<Item = Result<ServerJsonRpcMessage, StreamableHttpError<C::Error>>>
+ Send
+ 'static,
sse_worker_tx: tokio::sync::mpsc::Sender<ServerJsonRpcMessage>,
close_on_response: bool,
ct: CancellationToken,
) -> Result<(), StreamableHttpError<C::Error>> {
let mut sse_stream = std::pin::pin!(sse_stream);
loop {
let message = tokio::select! {
event = sse_stream.next() => {
event
}
_ = ct.cancelled() => {
tracing::debug!("cancelled");
break;
}
};
let Some(message) = message.transpose()? else {
break;
};
let is_response = matches!(
message,
ServerJsonRpcMessage::Response(_) | ServerJsonRpcMessage::Error(_)
);
let yield_result = sse_worker_tx.send(message).await;
if yield_result.is_err() {
tracing::trace!("streamable http transport worker dropped, exiting");
break;
}
if close_on_response && is_response {
tracing::debug!("got response, draining sse stream for connection reuse");
let _ = tokio::time::timeout(std::time::Duration::from_millis(50), async {
while sse_stream.next().await.is_some() {}
})
.await;
break;
}
}
Ok(())
}
async fn perform_reinitialization(
client: C,
saved_init_request: ClientJsonRpcMessage,
uri: Arc<str>,
auth_header: Option<String>,
custom_headers: HashMap<HeaderName, HeaderValue>,
) -> Result<(Option<Arc<str>>, HashMap<HeaderName, HeaderValue>), StreamableHttpError<C::Error>>
{
let (init_msg, new_session_id_str) = client
.post_message(
uri.clone(),
saved_init_request,
None,
auth_header.clone(),
custom_headers.clone(),
)
.await?
.expect_initialized::<C::Error>()
.await?;
let new_session_id: Option<Arc<str>> = new_session_id_str.map(|s| Arc::from(s.as_str()));
let mut new_protocol_headers = custom_headers;
if let ServerJsonRpcMessage::Response(response) = &init_msg {
if let ServerResult::InitializeResult(init_result) = &response.result {
if let Ok(hv) = HeaderValue::from_str(init_result.protocol_version.as_str()) {
new_protocol_headers
.insert(HeaderName::from_static("mcp-protocol-version"), hv);
}
}
}
let initialized_notification = ClientJsonRpcMessage::notification(
ClientNotification::InitializedNotification(InitializedNotification {
method: Default::default(),
extensions: Default::default(),
}),
);
client
.post_message(
uri,
initialized_notification,
new_session_id.clone(),
auth_header,
new_protocol_headers.clone(),
)
.await?
.expect_accepted_or_json::<C::Error>()?;
Ok((new_session_id, new_protocol_headers))
}
}
impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
type Role = RoleClient;
type Error = StreamableHttpError<C::Error>;
fn err_closed() -> Self::Error {
StreamableHttpError::TransportChannelClosed
}
fn err_join(e: tokio::task::JoinError) -> Self::Error {
StreamableHttpError::TokioJoinError(e)
}
fn config(&self) -> super::worker::WorkerConfig {
super::worker::WorkerConfig {
name: Some("StreamableHttpClientWorker".into()),
channel_buffer_capacity: self.config.channel_buffer_capacity,
}
}
async fn run(
self,
mut context: super::worker::WorkerContext<Self>,
) -> Result<(), WorkerQuitReason<Self::Error>> {
let channel_buffer_capacity = self.config.channel_buffer_capacity;
let (sse_worker_tx, mut sse_worker_rx) =
tokio::sync::mpsc::channel::<ServerJsonRpcMessage>(channel_buffer_capacity);
let config = self.config.clone();
let transport_task_ct = context.cancellation_token.clone();
let _drop_guard = transport_task_ct.clone().drop_guard();
let WorkerSendRequest {
responder,
message: initialize_request,
} = context.recv_from_handler().await?;
let saved_init_request = initialize_request.clone();
let (message, session_id) = match self
.client
.post_message(
config.uri.clone(),
initialize_request,
None,
config.auth_header.clone(),
config.custom_headers.clone(),
)
.await
{
Ok(res) => {
let _ = responder.send(Ok(()));
res.expect_initialized::<C::Error>().await.map_err(
WorkerQuitReason::fatal_context("process initialize response"),
)?
}
Err(err) => {
let msg = format!("{:?}", err);
let _ = responder.send(Err(err));
return Err(WorkerQuitReason::fatal(
StreamableHttpError::TransportChannelClosed,
msg,
));
}
};
let mut session_id: Option<Arc<str>> = if let Some(session_id) = session_id {
Some(session_id.into())
} else {
if !self.config.allow_stateless {
return Err(WorkerQuitReason::fatal(
StreamableHttpError::<C::Error>::MissingSessionIdInResponse,
"process initialize response",
));
}
None
};
let mut protocol_headers = {
let mut headers = config.custom_headers.clone();
if let ServerJsonRpcMessage::Response(response) = &message {
if let ServerResult::InitializeResult(init_result) = &response.result {
if let Ok(hv) = HeaderValue::from_str(init_result.protocol_version.as_str()) {
headers.insert(HeaderName::from_static("mcp-protocol-version"), hv);
}
}
}
headers
};
let mut session_cleanup_info = session_id.as_ref().map(|sid| SessionCleanupInfo {
client: self.client.clone(),
uri: config.uri.clone(),
session_id: sid.clone(),
auth_header: config.auth_header.clone(),
protocol_headers: protocol_headers.clone(),
});
context.send_to_handler(message).await?;
let initialized_notification = context.recv_from_handler().await?;
self.client
.post_message(
config.uri.clone(),
initialized_notification.message,
session_id.clone(),
config.auth_header.clone(),
protocol_headers.clone(),
)
.await
.map_err(WorkerQuitReason::fatal_context(
"send initialized notification",
))?
.expect_accepted_or_json::<C::Error>()
.map_err(WorkerQuitReason::fatal_context(
"process initialized notification response",
))?;
let _ = initialized_notification.responder.send(Ok(()));
#[allow(clippy::large_enum_variant)]
enum Event<W: Worker, E: std::error::Error + Send + Sync + 'static> {
ClientMessage(WorkerSendRequest<W>),
ServerMessage(ServerJsonRpcMessage),
StreamResult(Result<(), StreamableHttpError<E>>),
}
let mut streams = tokio::task::JoinSet::new();
if let Some(session_id) = &session_id {
let client = self.client.clone();
let uri = config.uri.clone();
let session_id = session_id.clone();
let auth_header = config.auth_header.clone();
let retry_config = self.config.retry_config.clone();
let sse_worker_tx = sse_worker_tx.clone();
let transport_task_ct = transport_task_ct.clone();
let config_uri = config.uri.clone();
let config_auth_header = config.auth_header.clone();
let spawn_headers = protocol_headers.clone();
streams.spawn(async move {
match client
.get_stream(
uri.clone(),
session_id.clone(),
None,
auth_header.clone(),
spawn_headers.clone(),
)
.await
{
Ok(stream) => {
let sse_stream = SseAutoReconnectStream::new(
stream,
StreamableHttpClientReconnect {
client: client.clone(),
session_id: session_id.clone(),
uri: config_uri,
auth_header: config_auth_header,
custom_headers: spawn_headers,
},
retry_config,
);
Self::execute_sse_stream(
sse_stream,
sse_worker_tx,
false,
transport_task_ct.child_token(),
)
.await
}
Err(StreamableHttpError::ServerDoesNotSupportSse) => {
tracing::debug!("server doesn't support sse, skip common stream");
Ok(())
}
Err(e) => {
tracing::error!("fail to get common stream: {e}");
Err(e)
}
}
});
}
let loop_result: Result<(), WorkerQuitReason<Self::Error>> = 'main_loop: loop {
let event = tokio::select! {
_ = transport_task_ct.cancelled() => {
tracing::debug!("cancelled");
break 'main_loop Err(WorkerQuitReason::Cancelled);
}
message = context.recv_from_handler() => {
match message {
Ok(msg) => Event::ClientMessage(msg),
Err(e) => break 'main_loop Err(e),
}
},
message = sse_worker_rx.recv() => {
let Some(message) = message else {
tracing::trace!("transport dropped, exiting");
break 'main_loop Err(WorkerQuitReason::HandlerTerminated);
};
Event::ServerMessage(message)
},
terminated_stream = streams.join_next(), if !streams.is_empty() => {
match terminated_stream {
Some(result) => {
Event::StreamResult(result.map_err(StreamableHttpError::TokioJoinError).and_then(std::convert::identity))
}
None => {
continue
}
}
}
};
match event {
Event::ClientMessage(send_request) => {
let WorkerSendRequest { message, responder } = send_request;
let response = self
.client
.post_message(
config.uri.clone(),
message.clone(),
session_id.clone(),
config.auth_header.clone(),
protocol_headers.clone(),
)
.await;
let send_result = match response {
Err(StreamableHttpError::SessionExpired) => {
if !config.reinit_on_expired_session {
Err(StreamableHttpError::SessionExpired)
} else {
tracing::info!(
"session expired (HTTP 404), attempting transparent re-initialization"
);
match Self::perform_reinitialization(
self.client.clone(),
saved_init_request.clone(),
config.uri.clone(),
config.auth_header.clone(),
config.custom_headers.clone(),
)
.await
{
Ok((new_session_id, new_protocol_headers)) => {
streams.abort_all();
session_id = new_session_id;
protocol_headers = new_protocol_headers;
session_cleanup_info =
session_id.as_ref().map(|sid| SessionCleanupInfo {
client: self.client.clone(),
uri: config.uri.clone(),
session_id: sid.clone(),
auth_header: config.auth_header.clone(),
protocol_headers: protocol_headers.clone(),
});
if let Some(new_sid) = &session_id {
let client = self.client.clone();
let uri = config.uri.clone();
let new_sid = new_sid.clone();
let auth_header = config.auth_header.clone();
let retry_config = self.config.retry_config.clone();
let sse_tx = sse_worker_tx.clone();
let task_ct = transport_task_ct.clone();
let config_uri = config.uri.clone();
let config_auth = config.auth_header.clone();
let spawn_headers = protocol_headers.clone();
streams.spawn(async move {
match client
.get_stream(
uri,
new_sid.clone(),
None,
auth_header.clone(),
spawn_headers.clone(),
)
.await
{
Ok(stream) => {
let sse_stream = SseAutoReconnectStream::new(
stream,
StreamableHttpClientReconnect {
client: client.clone(),
session_id: new_sid,
uri: config_uri,
auth_header: config_auth,
custom_headers: spawn_headers,
},
retry_config,
);
Self::execute_sse_stream(
sse_stream,
sse_tx,
false,
task_ct.child_token(),
)
.await
}
Err(StreamableHttpError::ServerDoesNotSupportSse) => {
tracing::debug!(
"server doesn't support sse after re-init"
);
Ok(())
}
Err(e) => {
tracing::error!(
"fail to get common stream after re-init: {e}"
);
Err(e)
}
}
});
}
let retry_response = self
.client
.post_message(
config.uri.clone(),
message,
session_id.clone(),
config.auth_header.clone(),
protocol_headers.clone(),
)
.await;
match retry_response {
Err(e) => Err(e),
Ok(StreamableHttpPostResponse::Accepted) => {
tracing::trace!(
"client message accepted after re-init"
);
Ok(())
}
Ok(StreamableHttpPostResponse::Json(msg, ..)) => {
context.send_to_handler(msg).await?;
Ok(())
}
Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
streams.spawn(Self::execute_sse_stream(
Self::raw_sse_to_jsonrpc(stream),
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
tracing::trace!("got new sse stream after re-init");
Ok(())
}
}
}
Err(reinit_err) => Err(reinit_err),
}
} }
Err(e) => Err(e),
Ok(StreamableHttpPostResponse::Accepted) => {
tracing::trace!("client message accepted");
Ok(())
}
Ok(StreamableHttpPostResponse::Json(message, ..)) => {
context.send_to_handler(message).await?;
Ok(())
}
Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
streams.spawn(Self::execute_sse_stream(
Self::raw_sse_to_jsonrpc(stream),
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
tracing::trace!("got new sse stream");
Ok(())
}
};
let _ = responder.send(send_result);
}
Event::ServerMessage(json_rpc_message) => {
if let Err(e) = context.send_to_handler(json_rpc_message).await {
break 'main_loop Err(e);
}
}
Event::StreamResult(result) => {
if result.is_err() {
tracing::warn!(
"sse client event stream terminated with error: {:?}",
result
);
}
}
}
};
if let Some(cleanup) = session_cleanup_info {
const SESSION_CLEANUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
let cleanup_session_id = cleanup.session_id.clone();
match tokio::time::timeout(
SESSION_CLEANUP_TIMEOUT,
cleanup.client.delete_session(
cleanup.uri,
cleanup.session_id,
cleanup.auth_header,
cleanup.protocol_headers,
),
)
.await
{
Ok(Ok(_)) => {
tracing::info!(
session_id = cleanup_session_id.as_ref(),
"delete session success"
)
}
Ok(Err(StreamableHttpError::ServerDoesNotSupportDeleteSession)) => {
tracing::info!(
session_id = cleanup_session_id.as_ref(),
"server doesn't support delete session"
)
}
Ok(Err(e)) => {
tracing::error!(
session_id = cleanup_session_id.as_ref(),
"fail to delete session: {e}"
);
}
Err(_elapsed) => {
tracing::warn!(
session_id = cleanup_session_id.as_ref(),
"session cleanup timed out after {:?}",
SESSION_CLEANUP_TIMEOUT
);
}
}
}
loop_result
}
}
pub type StreamableHttpClientTransport<C> = WorkerTransport<StreamableHttpClientWorker<C>>;
impl<C: StreamableHttpClient> StreamableHttpClientTransport<C> {
pub fn with_client(client: C, config: StreamableHttpClientTransportConfig) -> Self {
let worker = StreamableHttpClientWorker::new(client, config);
WorkerTransport::spawn(worker)
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct StreamableHttpClientTransportConfig {
pub uri: Arc<str>,
pub retry_config: Arc<dyn SseRetryPolicy>,
pub channel_buffer_capacity: usize,
pub allow_stateless: bool,
pub auth_header: Option<String>,
pub custom_headers: HashMap<HeaderName, HeaderValue>,
pub reinit_on_expired_session: bool,
}
impl StreamableHttpClientTransportConfig {
pub fn with_uri(uri: impl Into<Arc<str>>) -> Self {
Self {
uri: uri.into(),
..Default::default()
}
}
pub fn auth_header<T: Into<String>>(mut self, value: T) -> Self {
self.auth_header = Some(value.into());
self
}
pub fn custom_headers(mut self, custom_headers: HashMap<HeaderName, HeaderValue>) -> Self {
self.custom_headers = custom_headers;
self
}
pub fn reinit_on_expired_session(mut self, enable: bool) -> Self {
self.reinit_on_expired_session = enable;
self
}
}
impl Default for StreamableHttpClientTransportConfig {
fn default() -> Self {
Self {
uri: "localhost".into(),
retry_config: Arc::new(ExponentialBackoff::default()),
channel_buffer_capacity: 16,
allow_stateless: true,
auth_header: None,
custom_headers: HashMap::new(),
reinit_on_expired_session: true,
}
}
}