#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use tmcp::{
ClientCtx, ClientHandler, Result, ServerCtx, ServerHandler, schema,
testutils::{connected_client_and_server_with_conn, shutdown_client_and_server},
};
use tokio::{
sync::oneshot,
time::{Duration, sleep, timeout},
};
use tracing_subscriber::fmt;
#[derive(Clone)]
struct NotificationRecorder {
tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
#[async_trait]
impl ClientHandler for NotificationRecorder {
async fn notification(
&self,
_context: &ClientCtx,
notification: schema::ServerNotification,
) -> Result<()> {
tracing::info!("Client received notification: {:?}", notification);
if matches!(
notification,
schema::ServerNotification::ToolListChanged { .. }
) && let Some(tx) = self.tx.lock().unwrap().take()
{
tx.send(()).ok();
}
Ok(())
}
}
struct NotifyingServer {
sent_notification: Arc<Mutex<bool>>,
}
#[async_trait]
impl ServerHandler for NotifyingServer {
async fn on_connect(&self, context: &ServerCtx, _remote_addr: &str) -> Result<()> {
let sent_notification = self.sent_notification.clone();
let context = context.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(500)).await;
match context.notify(schema::ServerNotification::ToolListChanged { _meta: None }) {
Ok(_) => {
tracing::info!("Server sent roots_list_changed notification");
*sent_notification.lock().unwrap() = true;
}
Err(e) => {
tracing::error!("Failed to send notification: {:?}", e);
}
}
});
Ok(())
}
async fn initialize(
&self,
_context: &ServerCtx,
_protocol_version: String,
_capabilities: schema::ClientCapabilities,
_client_info: schema::Implementation,
) -> Result<schema::InitializeResult> {
Ok(schema::InitializeResult::new("notifying-server").with_version("0.1.0"))
}
}
#[tokio::test]
async fn test_server_to_client_notifications() {
fmt::try_init().ok();
let (tx_notif, rx_notif) = oneshot::channel::<()>();
let sent_notification = Arc::new(Mutex::new(false));
let (mut client, server_handle) = connected_client_and_server_with_conn(
{
let sent_notification = sent_notification.clone();
move || {
Box::new(NotifyingServer {
sent_notification: sent_notification.clone(),
})
}
},
NotificationRecorder {
tx: Arc::new(Mutex::new(Some(tx_notif))),
},
)
.await
.expect("Failed to connect client and server");
client.init().await.expect("Failed to initialize");
let result = timeout(Duration::from_secs(3), rx_notif).await;
assert!(
*sent_notification.lock().unwrap(),
"Server did not send notification"
);
assert!(result.is_ok(), "Timeout waiting for notification");
assert!(result.unwrap().is_ok(), "Failed to receive notification");
shutdown_client_and_server(client, server_handle).await;
}
}