1use async_trait::async_trait;
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Duration;
29use tokio::sync::{mpsc, RwLock};
30use uuid::Uuid;
31
32use crate::mcp::connection_manager::ConnectionManager;
33use crate::mcp::error::{McpError, McpResult};
34use crate::mcp::transport::McpRequest;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct McpResource {
41 pub uri: String,
43 pub name: String,
45 pub description: Option<String>,
47 #[serde(rename = "mimeType")]
49 pub mime_type: Option<String>,
50 pub server_name: String,
52}
53
54impl McpResource {
55 pub fn new(
57 uri: impl Into<String>,
58 name: impl Into<String>,
59 server_name: impl Into<String>,
60 ) -> Self {
61 Self {
62 uri: uri.into(),
63 name: name.into(),
64 description: None,
65 mime_type: None,
66 server_name: server_name.into(),
67 }
68 }
69
70 pub fn with_details(
72 uri: impl Into<String>,
73 name: impl Into<String>,
74 server_name: impl Into<String>,
75 description: Option<String>,
76 mime_type: Option<String>,
77 ) -> Self {
78 Self {
79 uri: uri.into(),
80 name: name.into(),
81 description,
82 mime_type,
83 server_name: server_name.into(),
84 }
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct McpResourceTemplate {
93 #[serde(rename = "uriTemplate")]
95 pub uri_template: String,
96 pub name: String,
98 pub description: Option<String>,
100 #[serde(rename = "mimeType")]
102 pub mime_type: Option<String>,
103 pub server_name: String,
105}
106
107impl McpResourceTemplate {
108 pub fn new(
110 uri_template: impl Into<String>,
111 name: impl Into<String>,
112 server_name: impl Into<String>,
113 ) -> Self {
114 Self {
115 uri_template: uri_template.into(),
116 name: name.into(),
117 description: None,
118 mime_type: None,
119 server_name: server_name.into(),
120 }
121 }
122
123 pub fn expand(&self, params: &HashMap<String, String>) -> String {
127 let mut result = self.uri_template.clone();
128 for (key, value) in params {
129 let placeholder = format!("{{{}}}", key);
130 result = result.replace(&placeholder, value);
131 }
132 result
133 }
134
135 pub fn get_parameters(&self) -> Vec<String> {
137 let mut params = Vec::new();
138 let mut chars = self.uri_template.chars().peekable();
139
140 while let Some(c) = chars.next() {
141 if c == '{' {
142 let mut param = String::new();
143 while let Some(&next) = chars.peek() {
144 if next == '}' {
145 chars.next();
146 break;
147 }
148 param.push(chars.next().unwrap());
149 }
150 if !param.is_empty() {
151 params.push(param);
152 }
153 }
154 }
155 params
156 }
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct ResourceContent {
162 pub uri: String,
164 pub text: Option<String>,
166 pub blob: Option<String>,
168 #[serde(rename = "mimeType")]
170 pub mime_type: Option<String>,
171}
172
173impl ResourceContent {
174 pub fn text(uri: impl Into<String>, text: impl Into<String>) -> Self {
176 Self {
177 uri: uri.into(),
178 text: Some(text.into()),
179 blob: None,
180 mime_type: Some("text/plain".to_string()),
181 }
182 }
183
184 pub fn blob(
186 uri: impl Into<String>,
187 blob: impl Into<String>,
188 mime_type: impl Into<String>,
189 ) -> Self {
190 Self {
191 uri: uri.into(),
192 text: None,
193 blob: Some(blob.into()),
194 mime_type: Some(mime_type.into()),
195 }
196 }
197
198 pub fn is_text(&self) -> bool {
200 self.text.is_some()
201 }
202
203 pub fn is_blob(&self) -> bool {
205 self.blob.is_some()
206 }
207}
208
209#[derive(Debug, Clone)]
211pub enum ResourceEvent {
212 Changed { uri: String, server_name: String },
214 Deleted { uri: String, server_name: String },
216 Subscribed { uri: String, server_name: String },
218 Unsubscribed { uri: String, server_name: String },
220}
221
222#[derive(Debug, Clone)]
224pub struct ResourceCacheEntry {
225 pub content: ResourceContent,
227 pub cached_at: DateTime<Utc>,
229 pub ttl: Duration,
231}
232
233impl ResourceCacheEntry {
234 pub fn is_valid(&self) -> bool {
236 let age = Utc::now() - self.cached_at;
237 age.num_milliseconds() < self.ttl.as_millis() as i64
238 }
239}
240
241#[derive(Debug, Clone)]
243struct SubscriptionInfo {
244 uri: String,
246 server_name: String,
248 #[allow(dead_code)]
250 subscribed_at: DateTime<Utc>,
251}
252
253#[async_trait]
257pub trait ResourceManager: Send + Sync {
258 async fn list_resources(&self, server_name: Option<&str>) -> McpResult<Vec<McpResource>>;
262
263 async fn list_templates(
265 &self,
266 server_name: Option<&str>,
267 ) -> McpResult<Vec<McpResourceTemplate>>;
268
269 async fn read_resource(&self, server_name: &str, uri: &str) -> McpResult<ResourceContent>;
271
272 async fn read_resource_cached(
276 &self,
277 server_name: &str,
278 uri: &str,
279 ) -> McpResult<ResourceContent>;
280
281 async fn subscribe(&self, server_name: &str, uri: &str) -> McpResult<()>;
283
284 async fn unsubscribe(&self, server_name: &str, uri: &str) -> McpResult<()>;
286
287 fn get_subscriptions(&self) -> Vec<(String, String)>;
289
290 fn clear_cache(&self, server_name: Option<&str>);
292
293 fn invalidate_cache(&self, uri: &str);
295
296 fn subscribe_events(&self) -> mpsc::Receiver<ResourceEvent>;
298
299 fn expand_template(
301 &self,
302 template: &McpResourceTemplate,
303 params: &HashMap<String, String>,
304 ) -> String;
305}
306
307pub struct McpResourceManager<C: ConnectionManager> {
309 connection_manager: Arc<C>,
311 cache: Arc<RwLock<HashMap<String, ResourceCacheEntry>>>,
313 subscriptions: Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
315 event_tx: Arc<RwLock<Option<mpsc::Sender<ResourceEvent>>>>,
317 default_cache_ttl: Duration,
319}
320
321impl<C: ConnectionManager> McpResourceManager<C> {
322 pub fn new(connection_manager: Arc<C>) -> Self {
324 Self {
325 connection_manager,
326 cache: Arc::new(RwLock::new(HashMap::new())),
327 subscriptions: Arc::new(RwLock::new(HashMap::new())),
328 event_tx: Arc::new(RwLock::new(None)),
329 default_cache_ttl: Duration::from_secs(300), }
331 }
332
333 pub fn with_cache_ttl(connection_manager: Arc<C>, cache_ttl: Duration) -> Self {
335 Self {
336 connection_manager,
337 cache: Arc::new(RwLock::new(HashMap::new())),
338 subscriptions: Arc::new(RwLock::new(HashMap::new())),
339 event_tx: Arc::new(RwLock::new(None)),
340 default_cache_ttl: cache_ttl,
341 }
342 }
343
344 pub fn cache_ttl(&self) -> Duration {
346 self.default_cache_ttl
347 }
348
349 pub fn set_cache_ttl(&mut self, ttl: Duration) {
351 self.default_cache_ttl = ttl;
352 }
353
354 async fn emit_event(&self, event: ResourceEvent) {
356 if let Some(tx) = self.event_tx.read().await.as_ref() {
357 let _ = tx.send(event).await;
358 }
359 }
360
361 fn cache_key(server_name: &str, uri: &str) -> String {
363 format!("{}:{}", server_name, uri)
364 }
365
366 async fn fetch_resources_from_server(&self, server_name: &str) -> McpResult<Vec<McpResource>> {
368 let connection = self
370 .connection_manager
371 .get_connection_by_server(server_name)
372 .ok_or_else(|| {
373 McpError::connection(format!("No connection found for server: {}", server_name))
374 })?;
375
376 let request = McpRequest::new(
378 serde_json::json!(format!("resources-list-{}", Uuid::new_v4())),
379 "resources/list",
380 );
381
382 let response = self
383 .connection_manager
384 .send(&connection.id, request)
385 .await?;
386
387 let result = response.into_result()?;
389
390 let resources_value = result
392 .get("resources")
393 .ok_or_else(|| McpError::protocol("Response missing 'resources' field"))?;
394
395 let raw_resources: Vec<serde_json::Value> = serde_json::from_value(resources_value.clone())
396 .map_err(|e| McpError::protocol(format!("Failed to parse resources: {}", e)))?;
397
398 let resources: Vec<McpResource> = raw_resources
400 .into_iter()
401 .filter_map(|r| {
402 let uri = r.get("uri")?.as_str()?.to_string();
403 let name = r.get("name")?.as_str()?.to_string();
404 let description = r
405 .get("description")
406 .and_then(|d| d.as_str())
407 .map(String::from);
408 let mime_type = r.get("mimeType").and_then(|m| m.as_str()).map(String::from);
409
410 Some(McpResource {
411 uri,
412 name,
413 description,
414 mime_type,
415 server_name: server_name.to_string(),
416 })
417 })
418 .collect();
419
420 Ok(resources)
421 }
422
423 async fn fetch_templates_from_server(
425 &self,
426 server_name: &str,
427 ) -> McpResult<Vec<McpResourceTemplate>> {
428 let connection = self
430 .connection_manager
431 .get_connection_by_server(server_name)
432 .ok_or_else(|| {
433 McpError::connection(format!("No connection found for server: {}", server_name))
434 })?;
435
436 let request = McpRequest::new(
438 serde_json::json!(format!("templates-list-{}", Uuid::new_v4())),
439 "resources/templates/list",
440 );
441
442 let response = self
443 .connection_manager
444 .send(&connection.id, request)
445 .await?;
446
447 let result = response.into_result()?;
449
450 let templates_value = result
452 .get("resourceTemplates")
453 .ok_or_else(|| McpError::protocol("Response missing 'resourceTemplates' field"))?;
454
455 let raw_templates: Vec<serde_json::Value> = serde_json::from_value(templates_value.clone())
456 .map_err(|e| McpError::protocol(format!("Failed to parse templates: {}", e)))?;
457
458 let templates: Vec<McpResourceTemplate> = raw_templates
460 .into_iter()
461 .filter_map(|t| {
462 let uri_template = t.get("uriTemplate")?.as_str()?.to_string();
463 let name = t.get("name")?.as_str()?.to_string();
464 let description = t
465 .get("description")
466 .and_then(|d| d.as_str())
467 .map(String::from);
468 let mime_type = t.get("mimeType").and_then(|m| m.as_str()).map(String::from);
469
470 Some(McpResourceTemplate {
471 uri_template,
472 name,
473 description,
474 mime_type,
475 server_name: server_name.to_string(),
476 })
477 })
478 .collect();
479
480 Ok(templates)
481 }
482
483 pub async fn handle_resource_changed(&self, server_name: &str, uri: &str) {
485 let cache_key = Self::cache_key(server_name, uri);
487 {
488 let mut cache = self.cache.write().await;
489 cache.remove(&cache_key);
490 }
491
492 self.emit_event(ResourceEvent::Changed {
494 uri: uri.to_string(),
495 server_name: server_name.to_string(),
496 })
497 .await;
498 }
499}
500
501#[async_trait]
502impl<C: ConnectionManager + 'static> ResourceManager for McpResourceManager<C> {
503 async fn list_resources(&self, server_name: Option<&str>) -> McpResult<Vec<McpResource>> {
504 match server_name {
505 Some(name) => self.fetch_resources_from_server(name).await,
506 None => {
507 let connections = self.connection_manager.get_all_connections();
509 let mut all_resources = Vec::new();
510
511 for conn in connections {
512 match self.fetch_resources_from_server(&conn.server_name).await {
513 Ok(resources) => all_resources.extend(resources),
514 Err(e) => {
515 tracing::warn!(
516 "Failed to list resources from server {}: {}",
517 conn.server_name,
518 e
519 );
520 }
521 }
522 }
523
524 Ok(all_resources)
525 }
526 }
527 }
528
529 async fn list_templates(
530 &self,
531 server_name: Option<&str>,
532 ) -> McpResult<Vec<McpResourceTemplate>> {
533 match server_name {
534 Some(name) => self.fetch_templates_from_server(name).await,
535 None => {
536 let connections = self.connection_manager.get_all_connections();
538 let mut all_templates = Vec::new();
539
540 for conn in connections {
541 match self.fetch_templates_from_server(&conn.server_name).await {
542 Ok(templates) => all_templates.extend(templates),
543 Err(e) => {
544 tracing::warn!(
545 "Failed to list templates from server {}: {}",
546 conn.server_name,
547 e
548 );
549 }
550 }
551 }
552
553 Ok(all_templates)
554 }
555 }
556 }
557
558 async fn read_resource(&self, server_name: &str, uri: &str) -> McpResult<ResourceContent> {
559 let connection = self
561 .connection_manager
562 .get_connection_by_server(server_name)
563 .ok_or_else(|| {
564 McpError::connection(format!("No connection found for server: {}", server_name))
565 })?;
566
567 let request = McpRequest::with_params(
569 serde_json::json!(format!("resource-read-{}", Uuid::new_v4())),
570 "resources/read",
571 serde_json::json!({
572 "uri": uri
573 }),
574 );
575
576 let response = self
577 .connection_manager
578 .send(&connection.id, request)
579 .await?;
580
581 let result = response.into_result()?;
583
584 let contents_value = result
586 .get("contents")
587 .ok_or_else(|| McpError::protocol("Response missing 'contents' field"))?;
588
589 let contents: Vec<serde_json::Value> = serde_json::from_value(contents_value.clone())
590 .map_err(|e| McpError::protocol(format!("Failed to parse contents: {}", e)))?;
591
592 let content = contents
594 .into_iter()
595 .next()
596 .ok_or_else(|| McpError::protocol("Empty contents array"))?;
597
598 let resource_uri = content
599 .get("uri")
600 .and_then(|u| u.as_str())
601 .unwrap_or(uri)
602 .to_string();
603 let text = content
604 .get("text")
605 .and_then(|t| t.as_str())
606 .map(String::from);
607 let blob = content
608 .get("blob")
609 .and_then(|b| b.as_str())
610 .map(String::from);
611 let mime_type = content
612 .get("mimeType")
613 .and_then(|m| m.as_str())
614 .map(String::from);
615
616 Ok(ResourceContent {
617 uri: resource_uri,
618 text,
619 blob,
620 mime_type,
621 })
622 }
623
624 async fn read_resource_cached(
625 &self,
626 server_name: &str,
627 uri: &str,
628 ) -> McpResult<ResourceContent> {
629 let cache_key = Self::cache_key(server_name, uri);
630
631 {
633 let cache = self.cache.read().await;
634 if let Some(entry) = cache.get(&cache_key) {
635 if entry.is_valid() {
636 return Ok(entry.content.clone());
637 }
638 }
639 }
640
641 let content = self.read_resource(server_name, uri).await?;
643
644 {
646 let mut cache = self.cache.write().await;
647 cache.insert(
648 cache_key,
649 ResourceCacheEntry {
650 content: content.clone(),
651 cached_at: Utc::now(),
652 ttl: self.default_cache_ttl,
653 },
654 );
655 }
656
657 Ok(content)
658 }
659
660 async fn subscribe(&self, server_name: &str, uri: &str) -> McpResult<()> {
661 let connection = self
663 .connection_manager
664 .get_connection_by_server(server_name)
665 .ok_or_else(|| {
666 McpError::connection(format!("No connection found for server: {}", server_name))
667 })?;
668
669 let request = McpRequest::with_params(
671 serde_json::json!(format!("resource-subscribe-{}", Uuid::new_v4())),
672 "resources/subscribe",
673 serde_json::json!({
674 "uri": uri
675 }),
676 );
677
678 self.connection_manager
679 .send(&connection.id, request)
680 .await?
681 .into_result()?;
682
683 let subscription_key = Self::cache_key(server_name, uri);
685 {
686 let mut subs = self.subscriptions.write().await;
687 subs.insert(
688 subscription_key,
689 SubscriptionInfo {
690 uri: uri.to_string(),
691 server_name: server_name.to_string(),
692 subscribed_at: Utc::now(),
693 },
694 );
695 }
696
697 self.emit_event(ResourceEvent::Subscribed {
699 uri: uri.to_string(),
700 server_name: server_name.to_string(),
701 })
702 .await;
703
704 Ok(())
705 }
706
707 async fn unsubscribe(&self, server_name: &str, uri: &str) -> McpResult<()> {
708 let connection = self
710 .connection_manager
711 .get_connection_by_server(server_name)
712 .ok_or_else(|| {
713 McpError::connection(format!("No connection found for server: {}", server_name))
714 })?;
715
716 let request = McpRequest::with_params(
718 serde_json::json!(format!("resource-unsubscribe-{}", Uuid::new_v4())),
719 "resources/unsubscribe",
720 serde_json::json!({
721 "uri": uri
722 }),
723 );
724
725 self.connection_manager
726 .send(&connection.id, request)
727 .await?
728 .into_result()?;
729
730 let subscription_key = Self::cache_key(server_name, uri);
732 {
733 let mut subs = self.subscriptions.write().await;
734 subs.remove(&subscription_key);
735 }
736
737 self.emit_event(ResourceEvent::Unsubscribed {
739 uri: uri.to_string(),
740 server_name: server_name.to_string(),
741 })
742 .await;
743
744 Ok(())
745 }
746
747 fn get_subscriptions(&self) -> Vec<(String, String)> {
748 self.subscriptions
749 .try_read()
750 .map(|subs| {
751 subs.values()
752 .map(|info| (info.server_name.clone(), info.uri.clone()))
753 .collect()
754 })
755 .unwrap_or_default()
756 }
757
758 fn clear_cache(&self, server_name: Option<&str>) {
759 let server_name_owned = server_name.map(|s| s.to_string());
760 let cache = self.cache.clone();
761 tokio::spawn(async move {
762 let mut cache = cache.write().await;
763 match server_name_owned {
764 Some(name) => {
765 let prefix = format!("{}:", name);
766 cache.retain(|k, _| !k.starts_with(&prefix));
767 }
768 None => {
769 cache.clear();
770 }
771 }
772 });
773 }
774
775 fn invalidate_cache(&self, uri: &str) {
776 let uri_owned = uri.to_string();
777 let cache = self.cache.clone();
778 tokio::spawn(async move {
779 let mut cache = cache.write().await;
780 cache.retain(|k, _| !k.ends_with(&format!(":{}", uri_owned)));
781 });
782 }
783
784 fn subscribe_events(&self) -> mpsc::Receiver<ResourceEvent> {
785 let (tx, rx) = mpsc::channel(100);
786 let event_tx = self.event_tx.clone();
787 tokio::spawn(async move {
788 *event_tx.write().await = Some(tx);
789 });
790 rx
791 }
792
793 fn expand_template(
794 &self,
795 template: &McpResourceTemplate,
796 params: &HashMap<String, String>,
797 ) -> String {
798 template.expand(params)
799 }
800}
801
802#[cfg(test)]
803mod tests {
804 use super::*;
805
806 #[test]
807 fn test_mcp_resource_new() {
808 let resource = McpResource::new("file:///test.txt", "test.txt", "test-server");
809 assert_eq!(resource.uri, "file:///test.txt");
810 assert_eq!(resource.name, "test.txt");
811 assert_eq!(resource.server_name, "test-server");
812 assert!(resource.description.is_none());
813 assert!(resource.mime_type.is_none());
814 }
815
816 #[test]
817 fn test_mcp_resource_with_details() {
818 let resource = McpResource::with_details(
819 "file:///test.txt",
820 "test.txt",
821 "test-server",
822 Some("A test file".to_string()),
823 Some("text/plain".to_string()),
824 );
825 assert_eq!(resource.description, Some("A test file".to_string()));
826 assert_eq!(resource.mime_type, Some("text/plain".to_string()));
827 }
828
829 #[test]
830 fn test_resource_template_new() {
831 let template = McpResourceTemplate::new("file:///{path}", "File Template", "test-server");
832 assert_eq!(template.uri_template, "file:///{path}");
833 assert_eq!(template.name, "File Template");
834 }
835
836 #[test]
837 fn test_resource_template_expand() {
838 let template = McpResourceTemplate::new("file:///{path}", "File Template", "test-server");
839
840 let mut params = HashMap::new();
841 params.insert("path".to_string(), "documents/test.txt".to_string());
842
843 let expanded = template.expand(¶ms);
844 assert_eq!(expanded, "file:///documents/test.txt");
845 }
846
847 #[test]
848 fn test_resource_template_expand_multiple_params() {
849 let template = McpResourceTemplate::new(
850 "db://{database}/{table}",
851 "Database Template",
852 "test-server",
853 );
854
855 let mut params = HashMap::new();
856 params.insert("database".to_string(), "mydb".to_string());
857 params.insert("table".to_string(), "users".to_string());
858
859 let expanded = template.expand(¶ms);
860 assert_eq!(expanded, "db://mydb/users");
861 }
862
863 #[test]
864 fn test_resource_template_get_parameters() {
865 let template = McpResourceTemplate::new(
866 "db://{database}/{table}?filter={filter}",
867 "Database Template",
868 "test-server",
869 );
870
871 let params = template.get_parameters();
872 assert_eq!(params.len(), 3);
873 assert!(params.contains(&"database".to_string()));
874 assert!(params.contains(&"table".to_string()));
875 assert!(params.contains(&"filter".to_string()));
876 }
877
878 #[test]
879 fn test_resource_template_expand_missing_param() {
880 let template = McpResourceTemplate::new("file:///{path}", "File Template", "test-server");
881
882 let params = HashMap::new(); let expanded = template.expand(¶ms);
885 assert_eq!(expanded, "file:///{path}");
887 }
888
889 #[test]
890 fn test_resource_content_text() {
891 let content = ResourceContent::text("file:///test.txt", "Hello, World!");
892 assert!(content.is_text());
893 assert!(!content.is_blob());
894 assert_eq!(content.text, Some("Hello, World!".to_string()));
895 assert_eq!(content.mime_type, Some("text/plain".to_string()));
896 }
897
898 #[test]
899 fn test_resource_content_blob() {
900 let content = ResourceContent::blob("file:///image.png", "base64data", "image/png");
901 assert!(!content.is_text());
902 assert!(content.is_blob());
903 assert_eq!(content.blob, Some("base64data".to_string()));
904 assert_eq!(content.mime_type, Some("image/png".to_string()));
905 }
906
907 #[test]
908 fn test_cache_key_generation() {
909 let key =
910 McpResourceManager::<crate::mcp::connection_manager::McpConnectionManager>::cache_key(
911 "server1",
912 "file:///test.txt",
913 );
914 assert_eq!(key, "server1:file:///test.txt");
915 }
916
917 #[test]
918 fn test_resource_cache_entry_validity() {
919 let entry = ResourceCacheEntry {
920 content: ResourceContent::text("file:///test.txt", "content"),
921 cached_at: Utc::now(),
922 ttl: Duration::from_secs(300),
923 };
924 assert!(entry.is_valid());
925
926 let expired_entry = ResourceCacheEntry {
928 content: ResourceContent::text("file:///test.txt", "content"),
929 cached_at: Utc::now() - chrono::Duration::seconds(400),
930 ttl: Duration::from_secs(300),
931 };
932 assert!(!expired_entry.is_valid());
933 }
934}