mod builder;
mod handler;
mod state;
mod service;
pub use builder::ServerBuilder;
pub use service::{ServerService, ServiceContext, ServiceRequest, ServiceResponse};
use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::{mpsc, RwLock};
use async_trait::async_trait;
use dashmap::DashMap;
use uuid::Uuid;
use crate::error::Error;
use crate::protocol::{
Implementation, RequestId, ProgressToken,
JSONRPCMessage, JSONRPCNotification,
logging::LoggingLevel,
};
use self::state::{ServerState, Connection};
use self::handler::ServerMessageHandler;
#[derive(Debug, Clone)]
pub struct ServerCapabilities {
pub logging: bool,
pub completions: bool,
pub prompts: bool,
pub prompts_list_changed: bool,
pub resources: bool,
pub resources_list_changed: bool,
pub resources_subscribe: bool,
pub tools: bool,
pub tools_list_changed: bool,
pub experimental: HashMap<String, serde_json::Value>,
}
impl Default for ServerCapabilities {
fn default() -> Self {
Self {
logging: true,
completions: false,
prompts: false,
prompts_list_changed: false,
resources: false,
resources_list_changed: false,
resources_subscribe: false,
tools: false,
tools_list_changed: false,
experimental: HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct ServerOptions {
pub implementation: Implementation,
pub capabilities: ServerCapabilities,
pub instructions: Option<String>,
pub auto_acknowledge_ping: bool,
pub default_timeout_ms: u64,
}
impl Default for ServerOptions {
fn default() -> Self {
Self {
implementation: Implementation::new("mcpx-server", env!("CARGO_PKG_VERSION")),
capabilities: ServerCapabilities::default(),
instructions: None,
auto_acknowledge_ping: true,
default_timeout_ms: 30000, }
}
}
#[derive(Debug, Clone)]
pub enum ServerEvent {
ClientConnected {
client_id: String,
client_info: Implementation,
protocol_version: String,
capabilities: ClientCapabilities,
},
ClientDisconnected {
client_id: String,
reason: String,
},
RootsUpdated {
client_id: String,
},
Error {
client_id: Option<String>,
error: Error,
},
}
#[derive(Debug, Clone, Default)]
pub struct ClientCapabilities {
pub roots: bool,
pub roots_list_changed: bool,
pub sampling: bool,
pub experimental: HashMap<String, serde_json::Value>,
}
pub struct Server {
id: String,
state: Arc<RwLock<ServerState>>,
connections: Arc<DashMap<String, Connection>>,
event_sender: mpsc::Sender<ServerEvent>,
options: ServerOptions,
handler: Arc<ServerMessageHandler>,
service: Arc<Box<dyn ServerService + Send + Sync>>,
}
#[async_trait]
pub trait EventListener: Send + Sync {
async fn on_event(&self, event: ServerEvent);
}
impl Server {
pub fn new(
options: ServerOptions,
service: Box<dyn ServerService + Send + Sync>,
) -> (Self, mpsc::Receiver<ServerEvent>) {
let id = Uuid::new_v4().to_string();
let (event_sender, event_receiver) = mpsc::channel(100);
let state = Arc::new(RwLock::new(ServerState::new()));
let connections = Arc::new(DashMap::new());
let handler = Arc::new(ServerMessageHandler::new(
state.clone(),
connections.clone(),
event_sender.clone(),
options.clone(),
));
let server = Self {
id,
state,
connections,
event_sender,
options,
handler,
service: Arc::new(service),
};
(server, event_receiver)
}
pub fn id(&self) -> &str {
&self.id
}
pub async fn start(&self) -> Result<(), Error> {
let mut state = self.state.write().await;
state.set_running();
Ok(())
}
pub async fn stop(&self) -> Result<(), Error> {
let mut state = self.state.write().await;
state.set_stopping();
self.connections.clear();
state.set_stopped();
Ok(())
}
pub async fn add_connection(&self, id: &str) -> Result<(), Error> {
let connection = Connection::new(id);
self.connections.insert(id.to_string(), connection);
Ok(())
}
pub async fn remove_connection(&self, id: &str) -> Result<(), Error> {
self.connections.remove(id);
Ok(())
}
pub async fn handle_message(
&self,
client_id: &str,
message: JSONRPCMessage,
) -> Result<Option<JSONRPCMessage>, Error> {
if !self.connections.contains_key(client_id) {
return Err(Error::InternalError(format!("Unknown client: {}", client_id)));
}
let response = self.handler.handle_message(client_id, message).await?;
Ok(response)
}
pub async fn send_notification(
&self,
client_id: &str,
_notification: JSONRPCNotification,
) -> Result<(), Error> {
if !self.connections.contains_key(client_id) {
return Err(Error::InternalError(format!("Unknown client: {}", client_id)));
}
Ok(())
}
pub async fn send_log(
&self,
client_id: &str,
level: LoggingLevel,
message: &str,
) -> Result<(), Error> {
let connection = self.connections.get(client_id).ok_or_else(|| {
Error::InternalError(format!("Unknown client: {}", client_id))
})?;
if !connection.capabilities.logging {
return Err(Error::UnsupportedFeature("Logging".to_string()));
}
let notification = JSONRPCNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/message".to_string(),
params: Some(serde_json::json!({
"level": level,
"data": message
})),
};
self.send_notification(client_id, notification).await
}
pub async fn notify_resources_changed(&self, client_id: &str) -> Result<(), Error> {
let connection = self.connections.get(client_id).ok_or_else(|| {
Error::InternalError(format!("Unknown client: {}", client_id))
})?;
if !connection.capabilities.resources_list_changed {
return Err(Error::UnsupportedFeature("Resource list changed notifications".to_string()));
}
let notification = JSONRPCNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/resources/list_changed".to_string(),
params: None,
};
self.send_notification(client_id, notification).await
}
pub async fn notify_resource_updated(
&self,
client_id: &str,
uri: &str,
) -> Result<(), Error> {
let connection = self.connections.get(client_id).ok_or_else(|| {
Error::InternalError(format!("Unknown client: {}", client_id))
})?;
if !connection.capabilities.resources_subscribe {
return Err(Error::UnsupportedFeature("Resource subscriptions".to_string()));
}
let notification = JSONRPCNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/resources/updated".to_string(),
params: Some(serde_json::json!({
"uri": uri
})),
};
self.send_notification(client_id, notification).await
}
pub async fn notify_prompts_changed(&self, client_id: &str) -> Result<(), Error> {
let connection = self.connections.get(client_id).ok_or_else(|| {
Error::InternalError(format!("Unknown client: {}", client_id))
})?;
if !connection.capabilities.prompts_list_changed {
return Err(Error::UnsupportedFeature("Prompts list changed notifications".to_string()));
}
let notification = JSONRPCNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/prompts/list_changed".to_string(),
params: None,
};
self.send_notification(client_id, notification).await
}
pub async fn notify_tools_changed(&self, client_id: &str) -> Result<(), Error> {
let connection = self.connections.get(client_id).ok_or_else(|| {
Error::InternalError(format!("Unknown client: {}", client_id))
})?;
if !connection.capabilities.tools_list_changed {
return Err(Error::UnsupportedFeature("Tools list changed notifications".to_string()));
}
let notification = JSONRPCNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/tools/list_changed".to_string(),
params: None,
};
self.send_notification(client_id, notification).await
}
pub async fn send_progress(
&self,
client_id: &str,
token: ProgressToken,
progress: f64,
total: Option<f64>,
message: Option<&str>,
) -> Result<(), Error> {
let mut params = serde_json::json!({
"progressToken": token,
"progress": progress
});
if let Some(total) = total {
params["total"] = serde_json::json!(total);
}
if let Some(message) = message {
params["message"] = serde_json::json!(message);
}
let notification = JSONRPCNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/progress".to_string(),
params: Some(params),
};
self.send_notification(client_id, notification).await
}
pub async fn cancel_request(
&self,
client_id: &str,
request_id: RequestId,
reason: Option<String>,
) -> Result<(), Error> {
let notification = JSONRPCNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/cancelled".to_string(),
params: Some(serde_json::json!({
"requestId": request_id,
"reason": reason
})),
};
self.send_notification(client_id, notification).await
}
pub async fn request_roots(&self, client_id: &str) -> Result<(), Error> {
let connection = self.connections.get(client_id).ok_or_else(|| {
Error::InternalError(format!("Unknown client: {}", client_id))
})?;
if !connection.capabilities.roots {
return Err(Error::UnsupportedFeature("Roots".to_string()));
}
Ok(())
}
}