use async_trait::async_trait;
use pmcp::error::Result as PmcpResult;
#[cfg(feature = "resource-watcher")]
use pmcp::server::resource_watcher::{ResourceWatcher, ResourceWatcherBuilder};
use pmcp::server::{ResourceHandler, Server};
use pmcp::types::capabilities::ServerCapabilities;
use pmcp::types::{Content, ListResourcesResult, ReadResourceResult, ResourceInfo};
use pmcp::RequestHandlerExtra;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tracing::info;
struct FileSystemResourceHandler {
base_dir: PathBuf,
resources: Arc<RwLock<HashMap<String, ResourceInfo>>>,
#[cfg(feature = "resource-watcher")]
watcher: Arc<RwLock<Option<ResourceWatcher>>>,
}
impl FileSystemResourceHandler {
fn new(base_dir: PathBuf) -> Self {
Self {
base_dir,
resources: Arc::new(RwLock::new(HashMap::new())),
#[cfg(feature = "resource-watcher")]
watcher: Arc::new(RwLock::new(None)),
}
}
#[cfg(feature = "resource-watcher")]
#[allow(dead_code)]
async fn start_watching(
&self,
notification_tx: mpsc::Sender<pmcp::types::Notification>,
) -> PmcpResult<()> {
let (tx, mut rx) = mpsc::channel(100);
let notification_tx_clone = notification_tx.clone();
tokio::spawn(async move {
while let Some(server_notif) = rx.recv().await {
let notif = pmcp::types::Notification::Server(server_notif);
let _ = notification_tx_clone.send(notif).await;
}
});
let mut watcher = ResourceWatcherBuilder::new()
.base_dir(&self.base_dir)
.debounce(Duration::from_millis(500))
.pattern("**/*.txt")
.pattern("**/*.md")
.pattern("**/*.json")
.ignore("**/.*")
.ignore("**/node_modules/**")
.max_resources(1000)
.build(tx)?;
watcher.start().await?;
*self.watcher.write().await = Some(watcher);
info!("Started watching directory: {:?}", self.base_dir);
Ok(())
}
async fn scan_directory(&self) -> PmcpResult<Vec<ResourceInfo>> {
use std::fs;
let mut resources = Vec::new();
let mut resource_map = self.resources.write().await;
resource_map.clear();
if let Ok(entries) = fs::read_dir(&self.base_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
let mime_type = match path.extension().and_then(|e| e.to_str()) {
Some("txt") => Some("text/plain".to_string()),
Some("md") => Some("text/markdown".to_string()),
Some("json") => Some("application/json".to_string()),
_ => continue,
};
let uri = format!("file://{}", path.display());
let mut info = ResourceInfo::new(&uri, name)
.with_description(format!("File resource: {}", name));
if let Some(mime) = &mime_type {
info = info.with_mime_type(mime);
}
resources.push(info.clone());
resource_map.insert(uri.clone(), info.clone());
#[cfg(feature = "resource-watcher")]
if let Some(watcher) = &*self.watcher.read().await {
let _ = watcher.add_resource(uri, info).await;
}
}
}
}
}
Ok(resources)
}
}
#[async_trait]
impl ResourceHandler for FileSystemResourceHandler {
async fn read(&self, uri: &str, _extra: RequestHandlerExtra) -> PmcpResult<ReadResourceResult> {
let path = if let Some(path_str) = uri.strip_prefix("file://") {
PathBuf::from(path_str)
} else {
return Err(pmcp::error::Error::not_found("Invalid URI scheme"));
};
let content = tokio::fs::read_to_string(&path)
.await
.map_err(|e| pmcp::error::Error::not_found(format!("Failed to read file: {}", e)))?;
let _mime_type = path
.extension()
.and_then(|e| e.to_str())
.and_then(|ext| match ext {
"txt" => Some("text/plain"),
"md" => Some("text/markdown"),
"json" => Some("application/json"),
_ => None,
})
.unwrap_or("text/plain");
Ok(ReadResourceResult::new(vec![Content::text(content)]))
}
async fn list(
&self,
_cursor: Option<String>,
_extra: RequestHandlerExtra,
) -> PmcpResult<ListResourcesResult> {
let resources = self.scan_directory().await?;
Ok(ListResourcesResult::new(resources))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
let watch_dir = std::env::args()
.nth(1)
.map(PathBuf::from)
.unwrap_or_else(|| std::env::current_dir().unwrap());
info!("Starting resource watcher example");
info!("Watching directory: {:?}", watch_dir);
let handler = FileSystemResourceHandler::new(watch_dir);
let server = Server::builder()
.name("resource-watcher-example")
.version("1.0.0")
.capabilities(ServerCapabilities::resources_only())
.resources(handler)
.build()?;
#[cfg(feature = "resource-watcher")]
{
info!("\nResourceWatcher features:");
info!("- Monitors .txt, .md, and .json files");
info!("- Sends notifications when files change");
info!("- Debounces changes (500ms)");
info!("- Ignores hidden files and node_modules");
info!("\nTo test file watching:");
info!("1. Create or modify .txt, .md, or .json files in the watched directory");
info!("2. The server will send resource update notifications");
info!("3. Clients subscribed to resources will receive updates");
}
#[cfg(not(feature = "resource-watcher"))]
{
info!("\nResourceWatcher is not enabled. To enable it, compile with:");
info!(" cargo run --example 18_resource_watcher --features resource-watcher");
}
info!("\nStarting MCP server on stdio...");
server.run_stdio().await?;
Ok(())
}