mcpx 0.1.5

A Rust SDK for the Model Context Protocol (MCP)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
//! MCP server implementation
//!
//! This module provides a server implementation for the Model Context Protocol (MCP).
//! It handles connection management, protocol encoding/decoding, and provides
//! a convenient API for implementing MCP servers.

mod builder;
mod handler;
mod state;
mod service;

pub use builder::ServerBuilder;
pub use service::{ServerService, ServiceContext, ServiceRequest, ServiceResponse};

use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::{mpsc, RwLock};
use async_trait::async_trait;
use dashmap::DashMap;
use uuid::Uuid;

use crate::error::Error;
use crate::protocol::{
    Implementation, RequestId, ProgressToken,
    JSONRPCMessage, JSONRPCNotification,
    logging::LoggingLevel,
};

use self::state::{ServerState, Connection};
use self::handler::ServerMessageHandler;

/// Server capability flags and settings
#[derive(Debug, Clone)]
pub struct ServerCapabilities {
    /// Whether the server supports sending log messages
    pub logging: bool,
    /// Whether the server supports completions
    pub completions: bool,
    /// Whether the server supports prompts
    pub prompts: bool,
    /// Whether the server supports prompts list changed notifications
    pub prompts_list_changed: bool,
    /// Whether the server supports resources
    pub resources: bool,
    /// Whether the server supports resource list changed notifications
    pub resources_list_changed: bool,
    /// Whether the server supports resource subscriptions
    pub resources_subscribe: bool,
    /// Whether the server supports tools
    pub tools: bool,
    /// Whether the server supports tool list changed notifications
    pub tools_list_changed: bool,
    /// Experimental capabilities
    pub experimental: HashMap<String, serde_json::Value>,
}

impl Default for ServerCapabilities {
    fn default() -> Self {
        Self {
            logging: true,
            completions: false,
            prompts: false,
            prompts_list_changed: false,
            resources: false,
            resources_list_changed: false,
            resources_subscribe: false,
            tools: false,
            tools_list_changed: false,
            experimental: HashMap::new(),
        }
    }
}

/// MCP server options
#[derive(Debug, Clone)]
pub struct ServerOptions {
    /// Server implementation info
    pub implementation: Implementation,
    /// Server capabilities
    pub capabilities: ServerCapabilities,
    /// Server instructions
    pub instructions: Option<String>,
    /// Automatically acknowledge ping requests
    pub auto_acknowledge_ping: bool,
    /// Default timeout for requests in milliseconds (0 = no timeout)
    pub default_timeout_ms: u64,
}

impl Default for ServerOptions {
    fn default() -> Self {
        Self {
            implementation: Implementation::new("mcpx-server", env!("CARGO_PKG_VERSION")),
            capabilities: ServerCapabilities::default(),
            instructions: None,
            auto_acknowledge_ping: true,
            default_timeout_ms: 30000, // 30 seconds
        }
    }
}

/// An event that can be emitted by the server
#[derive(Debug, Clone)]
pub enum ServerEvent {
    /// Client connected
    ClientConnected {
        /// Client ID
        client_id: String,
        /// Client implementation info
        client_info: Implementation,
        /// Client protocol version
        protocol_version: String,
        /// Client capabilities
        capabilities: ClientCapabilities,
    },
    /// Client disconnected
    ClientDisconnected {
        /// Client ID
        client_id: String,
        /// Reason for disconnection
        reason: String,
    },
    /// Roots list updated
    RootsUpdated {
        /// Client ID
        client_id: String,
    },
    /// Error occurred
    Error {
        /// Client ID (if any)
        client_id: Option<String>,
        /// Error details
        error: Error,
    },
}

/// Client capabilities
#[derive(Debug, Clone, Default)]
pub struct ClientCapabilities {
    /// Whether the client supports listing roots
    pub roots: bool,
    /// Whether the client supports notifications for changes to the roots list
    pub roots_list_changed: bool,
    /// Whether the client supports sampling from an LLM
    pub sampling: bool,
    /// Experimental capabilities
    pub experimental: HashMap<String, serde_json::Value>,
}

/// MCP server
pub struct Server {
    /// Unique server identifier
    id: String,
    /// Server state
    state: Arc<RwLock<ServerState>>,
    /// Connection tracking
    connections: Arc<DashMap<String, Connection>>,
    /// Event sender
    event_sender: mpsc::Sender<ServerEvent>,
    /// Server options
    options: ServerOptions,
    /// Message handler
    handler: Arc<ServerMessageHandler>,
    /// Service implementation
    service: Arc<Box<dyn ServerService + Send + Sync>>,
}

/// Server event listener
#[async_trait]
pub trait EventListener: Send + Sync {
    /// Called when a server event occurs
    async fn on_event(&self, event: ServerEvent);
}

impl Server {
    /// Create a new server with the given options and service
    pub fn new(
        options: ServerOptions,
        service: Box<dyn ServerService + Send + Sync>,
    ) -> (Self, mpsc::Receiver<ServerEvent>) {
        let id = Uuid::new_v4().to_string();
        let (event_sender, event_receiver) = mpsc::channel(100);

        let state = Arc::new(RwLock::new(ServerState::new()));
        let connections = Arc::new(DashMap::new());

        let handler = Arc::new(ServerMessageHandler::new(
            state.clone(),
            connections.clone(),
            event_sender.clone(),
            options.clone(),
        ));

        let server = Self {
            id,
            state,
            connections,
            event_sender,
            options,
            handler,
            service: Arc::new(service),
        };

        (server, event_receiver)
    }

    /// Get the server ID
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Start the server
    pub async fn start(&self) -> Result<(), Error> {
        // Set server state to running
        let mut state = self.state.write().await;
        state.set_running();

        Ok(())
    }

    /// Stop the server
    pub async fn stop(&self) -> Result<(), Error> {
        // Set server state to stopping
        let mut state = self.state.write().await;
        state.set_stopping();

        // Disconnect all clients
        self.connections.clear();

        // Set server state to stopped
        state.set_stopped();

        Ok(())
    }

    /// Add a connection to the server
    pub async fn add_connection(&self, id: &str) -> Result<(), Error> {
        let connection = Connection::new(id);
        self.connections.insert(id.to_string(), connection);
        Ok(())
    }

    /// Remove a connection from the server
    pub async fn remove_connection(&self, id: &str) -> Result<(), Error> {
        self.connections.remove(id);
        Ok(())
    }

    /// Handle an incoming message from a client
    pub async fn handle_message(
        &self,
        client_id: &str,
        message: JSONRPCMessage,
    ) -> Result<Option<JSONRPCMessage>, Error> {
        // Check if client exists
        if !self.connections.contains_key(client_id) {
            return Err(Error::InternalError(format!("Unknown client: {}", client_id)));
        }

        // Handle the message
        let response = self.handler.handle_message(client_id, message).await?;

        Ok(response)
    }

    /// Send a notification to a specific client
    pub async fn send_notification(
        &self,
        client_id: &str,
        _notification: JSONRPCNotification,
    ) -> Result<(), Error> {
        // Check if client exists
        if !self.connections.contains_key(client_id) {
            return Err(Error::InternalError(format!("Unknown client: {}", client_id)));
        }

        // Send notification through transport - this would be handled elsewhere
        // For now, we just return success
        Ok(())
    }

    /// Send a log message to a client
    pub async fn send_log(
        &self,
        client_id: &str,
        level: LoggingLevel,
        message: &str,
    ) -> Result<(), Error> {
        // Get the connection
        let connection = self.connections.get(client_id).ok_or_else(|| {
            Error::InternalError(format!("Unknown client: {}", client_id))
        })?;

        // Check if client supports logging
        if !connection.capabilities.logging {
            return Err(Error::UnsupportedFeature("Logging".to_string()));
        }

        // Create notification
        let notification = JSONRPCNotification {
            jsonrpc: "2.0".to_string(),
            method: "notifications/message".to_string(),
            params: Some(serde_json::json!({
                "level": level,
                "data": message
            })),
        };

        // Send notification
        self.send_notification(client_id, notification).await
    }

    /// Notify a client that resources have changed
    pub async fn notify_resources_changed(&self, client_id: &str) -> Result<(), Error> {
        // Get the connection
        let connection = self.connections.get(client_id).ok_or_else(|| {
            Error::InternalError(format!("Unknown client: {}", client_id))
        })?;

        // Check if client supports resource list changed notifications
        if !connection.capabilities.resources_list_changed {
            return Err(Error::UnsupportedFeature("Resource list changed notifications".to_string()));
        }

        // Create notification
        let notification = JSONRPCNotification {
            jsonrpc: "2.0".to_string(),
            method: "notifications/resources/list_changed".to_string(),
            params: None,
        };

        // Send notification
        self.send_notification(client_id, notification).await
    }

    /// Notify a client that a resource has been updated
    pub async fn notify_resource_updated(
        &self,
        client_id: &str,
        uri: &str,
    ) -> Result<(), Error> {
        // Get the connection
        let connection = self.connections.get(client_id).ok_or_else(|| {
            Error::InternalError(format!("Unknown client: {}", client_id))
        })?;

        // Check if client supports resource subscriptions
        if !connection.capabilities.resources_subscribe {
            return Err(Error::UnsupportedFeature("Resource subscriptions".to_string()));
        }

        // Create notification
        let notification = JSONRPCNotification {
            jsonrpc: "2.0".to_string(),
            method: "notifications/resources/updated".to_string(),
            params: Some(serde_json::json!({
                "uri": uri
            })),
        };

        // Send notification
        self.send_notification(client_id, notification).await
    }

    /// Notify a client that prompts have changed
    pub async fn notify_prompts_changed(&self, client_id: &str) -> Result<(), Error> {
        // Get the connection
        let connection = self.connections.get(client_id).ok_or_else(|| {
            Error::InternalError(format!("Unknown client: {}", client_id))
        })?;

        // Check if client supports prompts list changed notifications
        if !connection.capabilities.prompts_list_changed {
            return Err(Error::UnsupportedFeature("Prompts list changed notifications".to_string()));
        }

        // Create notification
        let notification = JSONRPCNotification {
            jsonrpc: "2.0".to_string(),
            method: "notifications/prompts/list_changed".to_string(),
            params: None,
        };

        // Send notification
        self.send_notification(client_id, notification).await
    }

    /// Notify a client that tools have changed
    pub async fn notify_tools_changed(&self, client_id: &str) -> Result<(), Error> {
        // Get the connection
        let connection = self.connections.get(client_id).ok_or_else(|| {
            Error::InternalError(format!("Unknown client: {}", client_id))
        })?;

        // Check if client supports tools list changed notifications
        if !connection.capabilities.tools_list_changed {
            return Err(Error::UnsupportedFeature("Tools list changed notifications".to_string()));
        }

        // Create notification
        let notification = JSONRPCNotification {
            jsonrpc: "2.0".to_string(),
            method: "notifications/tools/list_changed".to_string(),
            params: None,
        };

        // Send notification
        self.send_notification(client_id, notification).await
    }

    /// Send a progress update for a request
    pub async fn send_progress(
        &self,
        client_id: &str,
        token: ProgressToken,
        progress: f64,
        total: Option<f64>,
        message: Option<&str>,
    ) -> Result<(), Error> {
        // Create notification
        let mut params = serde_json::json!({
            "progressToken": token,
            "progress": progress
        });

        if let Some(total) = total {
            params["total"] = serde_json::json!(total);
        }

        if let Some(message) = message {
            params["message"] = serde_json::json!(message);
        }

        let notification = JSONRPCNotification {
            jsonrpc: "2.0".to_string(),
            method: "notifications/progress".to_string(),
            params: Some(params),
        };

        // Send notification
        self.send_notification(client_id, notification).await
    }

    /// Cancel a request
    pub async fn cancel_request(
        &self,
        client_id: &str,
        request_id: RequestId,
        reason: Option<String>,
    ) -> Result<(), Error> {
        // Create notification
        let notification = JSONRPCNotification {
            jsonrpc: "2.0".to_string(),
            method: "notifications/cancelled".to_string(),
            params: Some(serde_json::json!({
                "requestId": request_id,
                "reason": reason
            })),
        };

        // Send notification
        self.send_notification(client_id, notification).await
    }

    /// Request roots from a client
    pub async fn request_roots(&self, client_id: &str) -> Result<(), Error> {
        // Get the connection
        let connection = self.connections.get(client_id).ok_or_else(|| {
            Error::InternalError(format!("Unknown client: {}", client_id))
        })?;

        // Check if client supports roots
        if !connection.capabilities.roots {
            return Err(Error::UnsupportedFeature("Roots".to_string()));
        }

        // Create request (would be sent in a real implementation)
        // JSONRPCRequest {
        //     jsonrpc: "2.0".to_string(),
        //     id: Uuid::new_v4().to_string().into(),
        //     method: "roots/list".to_string(),
        //     params: None,
        // };

        // Send request - again, this would be handled elsewhere
        // For now, just track the request

        Ok(())
    }
}