use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Semaphore;
use turbomcp_protocol::jsonrpc::*;
#[cfg(feature = "experimental-tasks")]
use turbomcp_protocol::types::tasks::*;
use turbomcp_protocol::types::{
ClientCapabilities as ProtocolClientCapabilities, InitializeResult as ProtocolInitializeResult,
*,
};
use turbomcp_protocol::{Error, PROTOCOL_VERSION, Result};
use turbomcp_transport::{Transport, TransportConfig, TransportMessage};
use super::config::InitializeResult;
use super::protocol::ProtocolClient;
use crate::{
ClientCapabilities,
handlers::{HandlerError, HandlerRegistry},
sampling::SamplingHandler,
};
pub(super) struct ClientInner<T: Transport + 'static> {
pub(super) protocol: ProtocolClient<T>,
pub(super) capabilities: ClientCapabilities,
pub(super) initialized: AtomicBool,
pub(super) shutdown_requested: AtomicBool,
pub(super) sampling_handler: Arc<Mutex<Option<Arc<dyn SamplingHandler>>>>,
pub(super) handlers: Arc<Mutex<HandlerRegistry>>,
pub(super) handler_semaphore: Arc<Semaphore>,
}
pub struct Client<T: Transport + 'static> {
pub(super) inner: Arc<ClientInner<T>>,
}
impl<T: Transport + 'static> Clone for Client<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T: Transport + 'static> Drop for ClientInner<T> {
fn drop(&mut self) {
if !self.shutdown_requested.load(Ordering::Relaxed) {
tracing::warn!(
"MCP Client dropped without explicit shutdown(). \
Call client.shutdown().await before dropping for clean resource cleanup. \
Background tasks (WebSocket reconnection) may continue running."
);
}
self.protocol.dispatcher().shutdown();
}
}
impl<T: Transport + 'static> Client<T> {
pub fn new(transport: T) -> Self {
Self::new_with_config(transport, TransportConfig::default())
}
pub fn new_with_config(transport: T, config: TransportConfig) -> Self {
let capabilities = ClientCapabilities::default();
let client = Self {
inner: Arc::new(ClientInner {
protocol: ProtocolClient::with_config(transport, config),
capabilities: capabilities.clone(),
initialized: AtomicBool::new(false),
shutdown_requested: AtomicBool::new(false),
sampling_handler: Arc::new(Mutex::new(None)),
handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), }),
};
client.register_dispatcher_handlers();
client
}
pub fn with_capabilities(transport: T, capabilities: ClientCapabilities) -> Self {
Self::with_capabilities_and_config(transport, capabilities, TransportConfig::default())
}
pub fn with_capabilities_and_config(
transport: T,
capabilities: ClientCapabilities,
config: TransportConfig,
) -> Self {
let client = Self {
inner: Arc::new(ClientInner {
protocol: ProtocolClient::with_config(transport, config),
capabilities: capabilities.clone(),
initialized: AtomicBool::new(false),
shutdown_requested: AtomicBool::new(false),
sampling_handler: Arc::new(Mutex::new(None)),
handlers: Arc::new(Mutex::new(HandlerRegistry::new())),
handler_semaphore: Arc::new(Semaphore::new(capabilities.max_concurrent_handlers)), }),
};
client.register_dispatcher_handlers();
client
}
pub async fn shutdown(&self) -> Result<()> {
self.inner.shutdown_requested.store(true, Ordering::Relaxed);
tracing::info!("🛑 Shutting down MCP client");
self.inner.protocol.dispatcher().shutdown();
tracing::debug!("✅ Message dispatcher stopped");
match self.inner.protocol.transport().disconnect().await {
Ok(()) => {
tracing::info!("✅ Transport disconnected successfully");
}
Err(e) => {
tracing::warn!("Transport disconnect error (may already be closed): {}", e);
}
}
tracing::info!("✅ MCP client shutdown complete");
Ok(())
}
}
#[cfg(feature = "http")]
impl Client<turbomcp_transport::streamable_http_client::StreamableHttpClientTransport> {
pub async fn connect_http(url: impl Into<String>) -> Result<Self> {
use turbomcp_transport::streamable_http_client::{
StreamableHttpClientConfig, StreamableHttpClientTransport,
};
let config = StreamableHttpClientConfig {
base_url: url.into(),
..Default::default()
};
let transport = StreamableHttpClientTransport::new(config).map_err(|e| {
turbomcp_protocol::Error::transport(format!("Failed to build HTTP transport: {e}"))
})?;
let client = Self::new(transport);
client.initialize().await?;
Ok(client)
}
pub async fn connect_http_with<F>(url: impl Into<String>, config_fn: F) -> Result<Self>
where
F: FnOnce(&mut turbomcp_transport::streamable_http_client::StreamableHttpClientConfig),
{
use turbomcp_transport::streamable_http_client::{
StreamableHttpClientConfig, StreamableHttpClientTransport,
};
let mut config = StreamableHttpClientConfig {
base_url: url.into(),
..Default::default()
};
config_fn(&mut config);
let transport = StreamableHttpClientTransport::new(config).map_err(|e| {
turbomcp_protocol::Error::transport(format!("Failed to build HTTP transport: {e}"))
})?;
let client = Self::new(transport);
client.initialize().await?;
Ok(client)
}
}
#[cfg(feature = "tcp")]
impl Client<turbomcp_transport::tcp::TcpTransport> {
pub async fn connect_tcp(addr: impl AsRef<str>) -> Result<Self> {
use std::net::SocketAddr;
use turbomcp_transport::tcp::TcpTransport;
let server_addr: SocketAddr = addr
.as_ref()
.parse()
.map_err(|e| Error::invalid_request(format!("Invalid address: {}", e)))?;
let bind_addr: SocketAddr = if server_addr.is_ipv6() {
"[::]:0".parse().expect("valid IPv6 any-port address")
} else {
"0.0.0.0:0".parse().expect("valid IPv4 any-port address")
};
let transport = TcpTransport::new_client(bind_addr, server_addr);
let client = Self::new(transport);
client.initialize().await?;
Ok(client)
}
}
#[cfg(all(unix, feature = "unix"))]
impl Client<turbomcp_transport::unix::UnixTransport> {
pub async fn connect_unix(path: impl Into<std::path::PathBuf>) -> Result<Self> {
use turbomcp_transport::unix::UnixTransport;
let transport = UnixTransport::new_client(path.into());
let client = Self::new(transport);
client.initialize().await?;
Ok(client)
}
}
impl<T: Transport + 'static> Client<T> {
fn register_dispatcher_handlers(&self) {
let dispatcher = self.inner.protocol.dispatcher();
let client_for_requests = self.clone();
let client_for_notifications = self.clone();
let semaphore = Arc::clone(&self.inner.handler_semaphore);
let request_handler = Arc::new(move |request: JsonRpcRequest| {
let client = client_for_requests.clone();
let method = request.method.clone();
let req_id = request.id.clone();
let semaphore = Arc::clone(&semaphore);
tokio::spawn(async move {
let _permit = match semaphore.acquire().await {
Ok(permit) => permit,
Err(_) => {
tracing::warn!(
"Handler semaphore closed, dropping request: method={}",
method
);
return;
}
};
tracing::debug!(
"🔄 [request_handler] Handling server-initiated request: method={}, id={:?}",
method,
req_id
);
if let Err(e) = client.handle_request(request).await {
tracing::error!(
"❌ [request_handler] Error handling server request '{}': {}",
method,
e
);
tracing::error!(" Request ID: {:?}", req_id);
tracing::error!(" Error kind: {:?}", e.kind);
} else {
tracing::debug!(
"✅ [request_handler] Successfully handled server request: method={}, id={:?}",
method,
req_id
);
}
});
Ok(())
});
let semaphore_notif = Arc::clone(&self.inner.handler_semaphore);
let notification_handler = Arc::new(move |notification: JsonRpcNotification| {
let client = client_for_notifications.clone();
let semaphore = Arc::clone(&semaphore_notif);
tokio::spawn(async move {
let _permit = match semaphore.acquire().await {
Ok(permit) => permit,
Err(_) => {
tracing::warn!("Handler semaphore closed, dropping notification");
return;
}
};
if let Err(e) = client.handle_notification(notification).await {
tracing::error!("Error handling server notification: {}", e);
}
});
Ok(())
});
dispatcher.set_request_handler(request_handler);
dispatcher.set_notification_handler(notification_handler);
tracing::debug!("Dispatcher handlers registered successfully");
}
async fn handle_request(&self, request: JsonRpcRequest) -> Result<()> {
match request.method.as_str() {
"sampling/createMessage" => {
let handler_opt = self.inner.sampling_handler.lock().clone();
if let Some(handler) = handler_opt {
let request_id = match &request.id {
turbomcp_protocol::MessageId::String(s) => s.clone(),
turbomcp_protocol::MessageId::Number(n) => n.to_string(),
turbomcp_protocol::MessageId::Uuid(u) => u.to_string(),
};
let params: CreateMessageRequest =
serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
.map_err(|e| {
Error::internal(format!("Invalid createMessage params: {}", e))
})?;
match handler.handle_create_message(request_id, params).await {
Ok(result) => {
let result_value = serde_json::to_value(result).map_err(|e| {
Error::internal(format!("Failed to serialize response: {}", e))
})?;
let response = JsonRpcResponse::success(result_value, request.id);
self.send_response(response).await?;
}
Err(e) => {
tracing::warn!(
"⚠️ [handle_request] Sampling handler returned error: {}",
e
);
let (code, message) = if let Some(handler_err) =
e.downcast_ref::<HandlerError>()
{
let json_err = handler_err.into_jsonrpc_error();
tracing::info!(
"📋 [handle_request] HandlerError mapped to JSON-RPC code: {}",
json_err.code
);
(json_err.code, json_err.message)
} else if let Some(proto_err) =
e.downcast_ref::<turbomcp_protocol::Error>()
{
tracing::info!(
"📋 [handle_request] Protocol error mapped to code: {}",
proto_err.jsonrpc_error_code()
);
(proto_err.jsonrpc_error_code(), proto_err.to_string())
} else {
tracing::warn!(
"📋 [handle_request] Sampling handler returned unknown error type (not HandlerError or Protocol error): {}",
std::any::type_name_of_val(&*e)
);
(-32603, format!("Sampling handler error: {}", e))
};
let error = turbomcp_protocol::jsonrpc::JsonRpcError {
code,
message,
data: None,
};
let response =
JsonRpcResponse::error_response(error, request.id.clone());
tracing::info!(
"🔄 [handle_request] Attempting to send error response for request: {:?}",
request.id
);
self.send_response(response).await?;
tracing::info!(
"✅ [handle_request] Error response sent successfully for request: {:?}",
request.id
);
}
}
} else {
let error = turbomcp_protocol::jsonrpc::JsonRpcError {
code: -32601,
message: "Sampling not supported".to_string(),
data: None,
};
let response = JsonRpcResponse::error_response(error, request.id);
self.send_response(response).await?;
}
}
"roots/list" => {
let handler_opt = self.inner.handlers.lock().roots.clone();
let roots_result = if let Some(handler) = handler_opt {
handler.handle_roots_request().await
} else {
Ok(Vec::new())
};
match roots_result {
Ok(roots) => {
let result_value =
serde_json::to_value(turbomcp_protocol::types::ListRootsResult {
roots,
_meta: None,
})
.map_err(|e| {
Error::internal(format!(
"Failed to serialize roots response: {}",
e
))
})?;
let response = JsonRpcResponse::success(result_value, request.id);
self.send_response(response).await?;
}
Err(e) => {
let json_err = e.into_jsonrpc_error();
let response = JsonRpcResponse::error_response(json_err, request.id);
self.send_response(response).await?;
}
}
}
"elicitation/create" => {
let handler_opt = self.inner.handlers.lock().elicitation.clone();
if let Some(handler) = handler_opt {
let proto_params: turbomcp_protocol::types::ElicitRequestParams =
serde_json::from_value(request.params.unwrap_or(serde_json::Value::Null))
.map_err(|e| {
Error::internal(format!("Invalid elicitation params: {}", e))
})?;
let handler_request =
crate::handlers::ElicitationRequest::new(request.id.clone(), proto_params);
match handler.handle_elicitation(handler_request).await {
Ok(elicit_response) => {
let proto_result = elicit_response.into_protocol();
let result_value = serde_json::to_value(proto_result).map_err(|e| {
Error::internal(format!(
"Failed to serialize elicitation response: {}",
e
))
})?;
let response = JsonRpcResponse::success(result_value, request.id);
self.send_response(response).await?;
}
Err(e) => {
let response =
JsonRpcResponse::error_response(e.into_jsonrpc_error(), request.id);
self.send_response(response).await?;
}
}
} else {
let error = turbomcp_protocol::jsonrpc::JsonRpcError {
code: -32601,
message: "Elicitation not supported - no handler registered".to_string(),
data: None,
};
let response = JsonRpcResponse::error_response(error, request.id);
self.send_response(response).await?;
}
}
_ => {
let error = turbomcp_protocol::jsonrpc::JsonRpcError {
code: -32601,
message: format!("Method not found: {}", request.method),
data: None,
};
let response = JsonRpcResponse::error_response(error, request.id);
self.send_response(response).await?;
}
}
Ok(())
}
async fn handle_notification(&self, notification: JsonRpcNotification) -> Result<()> {
match notification.method.as_str() {
"notifications/progress" => {
let handler_opt = self.inner.handlers.lock().get_progress_handler();
if let Some(handler) = handler_opt {
let progress: crate::handlers::ProgressNotification = serde_json::from_value(
notification.params.unwrap_or(serde_json::Value::Null),
)
.map_err(|e| {
Error::internal(format!("Invalid progress notification: {}", e))
})?;
if let Err(e) = handler.handle_progress(progress).await {
tracing::error!("Progress handler error: {}", e);
}
} else {
tracing::debug!("Progress notification received (no handler registered)");
}
}
"notifications/message" => {
let handler_opt = self.inner.handlers.lock().get_log_handler();
if let Some(handler) = handler_opt {
let log: crate::handlers::LoggingNotification = serde_json::from_value(
notification.params.unwrap_or(serde_json::Value::Null),
)
.map_err(|e| Error::internal(format!("Invalid log notification: {}", e)))?;
if let Err(e) = handler.handle_log(log).await {
tracing::error!("Log handler error: {}", e);
}
} else {
tracing::debug!("Received log notification but no handler registered");
}
}
"notifications/resources/updated" => {
let handler_opt = self.inner.handlers.lock().get_resource_update_handler();
if let Some(handler) = handler_opt {
let update: crate::handlers::ResourceUpdatedNotification =
serde_json::from_value(
notification.params.unwrap_or(serde_json::Value::Null),
)
.map_err(|e| {
Error::internal(format!("Invalid resource update notification: {}", e))
})?;
if let Err(e) = handler.handle_resource_update(update).await {
tracing::error!("Resource update handler error: {}", e);
}
} else {
tracing::debug!(
"Received resource update notification but no handler registered"
);
}
}
"notifications/resources/list_changed" => {
let handler_opt = self
.inner
.handlers
.lock()
.get_resource_list_changed_handler();
if let Some(handler) = handler_opt {
if let Err(e) = handler.handle_resource_list_changed().await {
tracing::error!("Resource list changed handler error: {}", e);
}
} else {
tracing::debug!(
"Resource list changed notification received (no handler registered)"
);
}
}
"notifications/prompts/list_changed" => {
let handler_opt = self.inner.handlers.lock().get_prompt_list_changed_handler();
if let Some(handler) = handler_opt {
if let Err(e) = handler.handle_prompt_list_changed().await {
tracing::error!("Prompt list changed handler error: {}", e);
}
} else {
tracing::debug!(
"Prompt list changed notification received (no handler registered)"
);
}
}
"notifications/tools/list_changed" => {
let handler_opt = self.inner.handlers.lock().get_tool_list_changed_handler();
if let Some(handler) = handler_opt {
if let Err(e) = handler.handle_tool_list_changed().await {
tracing::error!("Tool list changed handler error: {}", e);
}
} else {
tracing::debug!(
"Tool list changed notification received (no handler registered)"
);
}
}
"notifications/cancelled" => {
let handler_opt = self.inner.handlers.lock().get_cancellation_handler();
if let Some(handler) = handler_opt {
let cancellation: crate::handlers::CancelledNotification =
serde_json::from_value(
notification.params.unwrap_or(serde_json::Value::Null),
)
.map_err(|e| {
Error::internal(format!("Invalid cancellation notification: {}", e))
})?;
if let Err(e) = handler.handle_cancellation(cancellation).await {
tracing::error!("Cancellation handler error: {}", e);
}
} else {
tracing::debug!("Cancellation notification received (no handler registered)");
}
}
_ => {
tracing::debug!("Received unknown notification: {}", notification.method);
}
}
Ok(())
}
async fn send_response(&self, response: JsonRpcResponse) -> Result<()> {
tracing::info!(
"📤 [send_response] Sending JSON-RPC response: id={:?}",
response.id
);
let payload = serde_json::to_vec(&response).map_err(|e| {
tracing::error!("❌ [send_response] Failed to serialize response: {}", e);
Error::internal(format!("Failed to serialize response: {}", e))
})?;
tracing::debug!(
"📤 [send_response] Response payload: {} bytes",
payload.len()
);
tracing::debug!(
"📤 [send_response] Response JSON: {}",
String::from_utf8_lossy(&payload)
);
let message = TransportMessage::new(
turbomcp_protocol::MessageId::from("response".to_string()),
payload.into(),
);
self.inner
.protocol
.transport()
.send(message)
.await
.map_err(|e| {
tracing::error!("❌ [send_response] Transport send failed: {}", e);
Error::transport(format!("Failed to send response: {}", e))
})?;
tracing::info!(
"✅ [send_response] Response sent successfully: id={:?}",
response.id
);
Ok(())
}
pub async fn initialize(&self) -> Result<InitializeResult> {
let mut client_caps = ProtocolClientCapabilities::default();
if let Some(sampling_caps) = self.get_sampling_capabilities() {
client_caps.sampling = Some(sampling_caps);
}
if let Some(elicitation_caps) = self.get_elicitation_capabilities() {
client_caps.elicitation = Some(elicitation_caps);
}
if let Some(roots_caps) = self.get_roots_capabilities() {
client_caps.roots = Some(roots_caps);
}
let request = InitializeRequest {
protocol_version: PROTOCOL_VERSION.into(),
capabilities: client_caps,
client_info: turbomcp_protocol::types::Implementation {
name: "turbomcp-client".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
title: Some("TurboMCP Client".to_string()),
..Default::default()
},
_meta: None,
};
self.initialize_with_request(request).await
}
pub async fn initialize_with_request(
&self,
request: InitializeRequest,
) -> Result<InitializeResult> {
let transport = self.inner.protocol.transport();
let transport_state = transport.state().await;
if !matches!(
transport_state,
turbomcp_transport::TransportState::Connected
) {
tracing::debug!(
"Auto-connecting transport (current state: {:?})",
transport_state
);
transport
.connect()
.await
.map_err(|e| Error::transport(format!("Failed to connect transport: {}", e)))?;
tracing::info!("Transport connected successfully");
}
let protocol_response: ProtocolInitializeResult = self
.inner
.protocol
.request("initialize", Some(serde_json::to_value(request)?))
.await?;
self.inner.initialized.store(true, Ordering::Relaxed);
self.inner
.protocol
.notify("notifications/initialized", None)
.await?;
Ok(InitializeResult {
server_info: protocol_response.server_info,
server_capabilities: protocol_response.capabilities,
})
}
pub async fn subscribe(&self, uri: &str) -> Result<EmptyResult> {
if !self.inner.initialized.load(Ordering::Relaxed) {
return Err(Error::invalid_request("Client not initialized"));
}
if uri.is_empty() {
return Err(Error::invalid_request("Subscription URI cannot be empty"));
}
let request = SubscribeRequest { uri: uri.into() };
self.inner
.protocol
.request(
"resources/subscribe",
Some(serde_json::to_value(request).map_err(|e| {
Error::internal(format!("Failed to serialize subscribe request: {}", e))
})?),
)
.await
}
pub async fn unsubscribe(&self, uri: &str) -> Result<EmptyResult> {
if !self.inner.initialized.load(Ordering::Relaxed) {
return Err(Error::invalid_request("Client not initialized"));
}
if uri.is_empty() {
return Err(Error::invalid_request("Unsubscription URI cannot be empty"));
}
let request = UnsubscribeRequest { uri: uri.into() };
self.inner
.protocol
.request(
"resources/unsubscribe",
Some(serde_json::to_value(request).map_err(|e| {
Error::internal(format!("Failed to serialize unsubscribe request: {}", e))
})?),
)
.await
}
#[must_use]
pub fn capabilities(&self) -> &ClientCapabilities {
&self.inner.capabilities
}
#[cfg(feature = "experimental-tasks")]
pub async fn get_task(&self, task_id: &str) -> Result<Task> {
let request = GetTaskRequest {
task_id: task_id.to_string(),
};
self.inner
.protocol
.request(
"tasks/get",
Some(serde_json::to_value(request).map_err(|e| {
Error::internal(format!("Failed to serialize get_task request: {}", e))
})?),
)
.await
}
#[cfg(feature = "experimental-tasks")]
pub async fn cancel_task(&self, task_id: &str) -> Result<Task> {
let request = CancelTaskRequest {
task_id: task_id.to_string(),
};
self.inner
.protocol
.request(
"tasks/cancel",
Some(serde_json::to_value(request).map_err(|e| {
Error::internal(format!("Failed to serialize cancel_task request: {}", e))
})?),
)
.await
}
#[cfg(feature = "experimental-tasks")]
pub async fn list_tasks(
&self,
cursor: Option<String>,
limit: Option<usize>,
) -> Result<ListTasksResult> {
let request = ListTasksRequest { cursor, limit };
self.inner
.protocol
.request(
"tasks/list",
Some(serde_json::to_value(request).map_err(|e| {
Error::internal(format!("Failed to serialize list_tasks request: {}", e))
})?),
)
.await
}
#[cfg(feature = "experimental-tasks")]
pub async fn get_task_result(&self, task_id: &str) -> Result<GetTaskPayloadResult> {
let request = GetTaskPayloadRequest {
task_id: task_id.to_string(),
};
self.inner
.protocol
.request(
"tasks/result",
Some(serde_json::to_value(request).map_err(|e| {
Error::internal(format!(
"Failed to serialize get_task_result request: {}",
e
))
})?),
)
.await
}
fn get_elicitation_capabilities(
&self,
) -> Option<turbomcp_protocol::types::ElicitationCapabilities> {
if self.has_elicitation_handler() {
Some(turbomcp_protocol::types::ElicitationCapabilities::default())
} else {
None
}
}
fn get_roots_capabilities(&self) -> Option<turbomcp_protocol::types::RootsCapabilities> {
if self.has_roots_handler() {
Some(turbomcp_protocol::types::RootsCapabilities {
list_changed: Some(true), })
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::future::Future;
use std::pin::Pin;
use turbomcp_transport::{
TransportCapabilities, TransportConfig, TransportMessage, TransportMetrics,
TransportResult, TransportState, TransportType,
};
#[derive(Debug, Default)]
struct NoopTransport {
capabilities: TransportCapabilities,
}
impl Transport for NoopTransport {
fn transport_type(&self) -> TransportType {
TransportType::Stdio
}
fn capabilities(&self) -> &TransportCapabilities {
&self.capabilities
}
fn state(&self) -> Pin<Box<dyn Future<Output = TransportState> + Send + '_>> {
Box::pin(async { TransportState::Disconnected })
}
fn connect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
Box::pin(async { Ok(()) })
}
fn disconnect(&self) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
Box::pin(async { Ok(()) })
}
fn send(
&self,
_message: TransportMessage,
) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
Box::pin(async { Ok(()) })
}
fn receive(
&self,
) -> Pin<Box<dyn Future<Output = TransportResult<Option<TransportMessage>>> + Send + '_>>
{
Box::pin(async { Ok(None) })
}
fn metrics(&self) -> Pin<Box<dyn Future<Output = TransportMetrics> + Send + '_>> {
Box::pin(async { TransportMetrics::default() })
}
fn configure(
&self,
_config: TransportConfig,
) -> Pin<Box<dyn Future<Output = TransportResult<()>> + Send + '_>> {
Box::pin(async { Ok(()) })
}
}
#[tokio::test]
async fn test_with_capabilities_and_config_uses_handler_limit() {
let capabilities = ClientCapabilities {
max_concurrent_handlers: 7,
..Default::default()
};
let client = Client::with_capabilities_and_config(
NoopTransport::default(),
capabilities,
TransportConfig::default(),
);
assert_eq!(client.inner.handler_semaphore.available_permits(), 7);
}
#[tokio::test]
async fn test_shutdown_sets_shutdown_flag() {
let client = Client::new(NoopTransport::default());
assert!(!client.inner.shutdown_requested.load(Ordering::Relaxed));
client.shutdown().await.expect("shutdown should succeed");
assert!(client.inner.shutdown_requested.load(Ordering::Relaxed));
}
}