use oauth_provider_rs::storage::create_dynamodb_storage;
use oauth_provider_rs::{CognitoOAuthConfig, OAuthProvider};
use remote_mcp_kernel::{
config::Config, error::AppResult, handlers::SseHandlerConfig, microkernel::MicrokernelServer,
};
use rmcp::{
Error as McpError, ServerHandler,
handler::server::router::tool::ToolRouter,
handler::server::tool::Parameters,
model::{CallToolResult, Content, Implementation, ServerCapabilities, ServerInfo},
tool, tool_handler, tool_router,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::env;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Debug, Clone)]
pub struct CustomMcpServer {
tool_router: ToolRouter<Self>,
name: String,
}
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct ListFilesRequest {
#[schemars(description = "Directory path to list files from")]
pub path: String,
#[schemars(description = "Include hidden files")]
pub include_hidden: Option<bool>,
}
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct ReadFileRequest {
#[schemars(description = "File path to read")]
pub path: String,
#[schemars(description = "Maximum number of lines to read")]
pub max_lines: Option<usize>,
}
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct WriteFileRequest {
#[schemars(description = "File path to write to")]
pub path: String,
#[schemars(description = "Content to write")]
pub content: String,
#[schemars(description = "Append to file instead of overwriting")]
pub append: Option<bool>,
}
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct WordCountRequest {
#[schemars(description = "Text to count words in")]
pub text: String,
}
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
pub struct TextSearchRequest {
#[schemars(description = "Text to search in")]
pub text: String,
#[schemars(description = "Pattern to search for")]
pub pattern: String,
#[schemars(description = "Case sensitive search")]
pub case_sensitive: Option<bool>,
}
#[tool_router]
impl CustomMcpServer {
pub fn new(name: String) -> Self {
Self {
tool_router: Self::tool_router(),
name,
}
}
#[tool(description = "List files and directories in the specified path")]
async fn list_files(
&self,
Parameters(req): Parameters<ListFilesRequest>,
) -> Result<CallToolResult, McpError> {
let path = std::path::Path::new(&req.path);
if !path.exists() {
return Ok(CallToolResult::error(vec![Content::text(format!(
"Path does not exist: {}",
req.path
))]));
}
if !path.is_dir() {
return Ok(CallToolResult::error(vec![Content::text(format!(
"Path is not a directory: {}",
req.path
))]));
}
let mut files = Vec::new();
let include_hidden = req.include_hidden.unwrap_or(false);
match std::fs::read_dir(path) {
Ok(entries) => {
for entry in entries {
match entry {
Ok(entry) => {
let file_name = entry.file_name().to_string_lossy().to_string();
let is_hidden = file_name.starts_with('.');
if include_hidden || !is_hidden {
let file_type = if entry.path().is_dir() {
"directory"
} else {
"file"
};
files.push(format!("{} ({})", file_name, file_type));
}
}
Err(e) => {
return Ok(CallToolResult::error(vec![Content::text(format!(
"Error reading directory entry: {}",
e
))]));
}
}
}
}
Err(e) => {
return Ok(CallToolResult::error(vec![Content::text(format!(
"Error reading directory: {}",
e
))]));
}
}
files.sort();
let result = files.join("\n");
Ok(CallToolResult::success(vec![Content::text(result)]))
}
#[tool(description = "Read the contents of a file")]
async fn read_file(
&self,
Parameters(req): Parameters<ReadFileRequest>,
) -> Result<CallToolResult, McpError> {
let path = std::path::Path::new(&req.path);
if !path.exists() {
return Ok(CallToolResult::error(vec![Content::text(format!(
"File does not exist: {}",
req.path
))]));
}
if !path.is_file() {
return Ok(CallToolResult::error(vec![Content::text(format!(
"Path is not a file: {}",
req.path
))]));
}
match std::fs::read_to_string(path) {
Ok(content) => {
let result = if let Some(max_lines) = req.max_lines {
content
.lines()
.take(max_lines)
.collect::<Vec<_>>()
.join("\n")
} else {
content
};
Ok(CallToolResult::success(vec![Content::text(result)]))
}
Err(e) => Ok(CallToolResult::error(vec![Content::text(format!(
"Error reading file: {}",
e
))])),
}
}
#[tool(description = "Write content to a file")]
async fn write_file(
&self,
Parameters(req): Parameters<WriteFileRequest>,
) -> Result<CallToolResult, McpError> {
let path = std::path::Path::new(&req.path);
if let Some(parent) = path.parent() {
if !parent.exists() {
if let Err(e) = std::fs::create_dir_all(parent) {
return Ok(CallToolResult::error(vec![Content::text(format!(
"Error creating parent directories: {}",
e
))]));
}
}
}
let result = if req.append.unwrap_or(false) {
std::fs::write(path, &req.content)
} else {
std::fs::write(path, &req.content)
};
match result {
Ok(()) => Ok(CallToolResult::success(vec![Content::text(format!(
"Successfully wrote {} bytes to {}",
req.content.len(),
req.path
))])),
Err(e) => Ok(CallToolResult::error(vec![Content::text(format!(
"Error writing file: {}",
e
))])),
}
}
#[tool(description = "Get system information including CPU, memory, and disk usage")]
async fn get_system_info(&self) -> Result<CallToolResult, McpError> {
let mut info = Vec::new();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
info.push(format!("Timestamp: {}", now));
if let Ok(cwd) = std::env::current_dir() {
info.push(format!("Working Directory: {}", cwd.display()));
}
let env_count = std::env::vars().count();
info.push(format!("Environment Variables: {}", env_count));
info.push(format!("OS: {}", std::env::consts::OS));
info.push(format!("Architecture: {}", std::env::consts::ARCH));
let result = info.join("\n");
Ok(CallToolResult::success(vec![Content::text(result)]))
}
#[tool(description = "Count words, lines, and characters in text")]
async fn count_words(
&self,
Parameters(req): Parameters<WordCountRequest>,
) -> Result<CallToolResult, McpError> {
let text = &req.text;
let lines = text.lines().count();
let words = text.split_whitespace().count();
let chars = text.chars().count();
let bytes = text.len();
let result = format!(
"Lines: {}\nWords: {}\nCharacters: {}\nBytes: {}",
lines, words, chars, bytes
);
Ok(CallToolResult::success(vec![Content::text(result)]))
}
#[tool(description = "Search for patterns in text")]
async fn search_text(
&self,
Parameters(req): Parameters<TextSearchRequest>,
) -> Result<CallToolResult, McpError> {
let text = &req.text;
let pattern = &req.pattern;
let case_sensitive = req.case_sensitive.unwrap_or(false);
let (search_text, search_pattern) = if case_sensitive {
(text.to_string(), pattern.to_string())
} else {
(text.to_lowercase(), pattern.to_lowercase())
};
let mut matches = Vec::new();
for (line_num, line) in search_text.lines().enumerate() {
if line.contains(&search_pattern) {
matches.push(format!(
"Line {}: {}",
line_num + 1,
text.lines().nth(line_num).unwrap_or("")
));
}
}
let result = if matches.is_empty() {
format!("No matches found for pattern: {}", pattern)
} else {
format!("Found {} matches:\n{}", matches.len(), matches.join("\n"))
};
Ok(CallToolResult::success(vec![Content::text(result)]))
}
#[tool(description = "Get current date and time in various formats")]
async fn get_datetime(&self) -> Result<CallToolResult, McpError> {
let now = std::time::SystemTime::now();
let unix_timestamp = now.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
let result = format!(
"Unix Timestamp: {}\nISO 8601 (approx): {}",
unix_timestamp,
chrono::DateTime::from_timestamp(unix_timestamp as i64, 0)
.unwrap_or_default()
.format("%Y-%m-%dT%H:%M:%SZ")
);
Ok(CallToolResult::success(vec![Content::text(result)]))
}
}
#[tool_handler]
impl ServerHandler for CustomMcpServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
protocol_version: Default::default(),
capabilities: ServerCapabilities::builder()
.enable_tools()
.build(),
server_info: Implementation {
name: self.name.clone(),
version: "1.0.0".to_string(),
},
instructions: Some("A custom MCP server with file operations, system utilities, and text processing tools".to_string()),
}
}
}
#[tokio::main]
async fn main() -> AppResult<()> {
dotenv::dotenv().ok();
let config = Config::from_env()?;
init_tracing(&config)?;
tracing::info!("Starting Custom MCP Server example with Cognito and DynamoDB storage...");
let cognito_config = CognitoOAuthConfig {
client_id: config.cognito.client_id.clone(),
client_secret: config.cognito.client_secret.clone().unwrap_or_default(),
redirect_uri: format!(
"http://{}:{}/oauth/callback",
config.server.host, config.server.port
),
scope: config.cognito.scope.clone(),
provider_name: "cognito".to_string(),
};
let table_name =
env::var("DYNAMODB_TABLE_NAME").unwrap_or_else(|_| "oauth-storage".to_string());
let create_table = env::var("DYNAMODB_CREATE_TABLE")
.unwrap_or_else(|_| "true".to_string())
.parse::<bool>()
.unwrap_or(true);
log_startup_info(&config, &table_name, create_table);
let (storage, client_manager) = create_dynamodb_storage(
table_name.clone(),
create_table,
Some("expires_at".to_string()),
)
.await
.map_err(|e| {
remote_mcp_kernel::error::AppError::Internal(format!(
"Failed to create DynamoDB storage: {}",
e
))
})?;
let oauth_handler = oauth_provider_rs::CognitoOAuthHandler::new_simple(
storage,
client_manager,
cognito_config,
config.cognito.cognito_domain.clone(),
config.cognito.region.clone(),
config.cognito.user_pool_id.clone(),
);
let oauth_provider = OAuthProvider::new(oauth_handler);
let custom_mcp_server = CustomMcpServer::new("Custom File & System MCP Server".to_string());
let microkernel = MicrokernelServer::new()
.with_oauth_provider(oauth_provider)
.with_mcp_streamable_handler(custom_mcp_server.clone())
.with_mcp_sse_handler(custom_mcp_server, SseHandlerConfig::default());
let bind_address = config.bind_socket_addr()?;
tracing::info!("🚀 Starting microkernel server on {}", bind_address);
microkernel.serve(bind_address).await?;
Ok(())
}
fn init_tracing(config: &Config) -> AppResult<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| config.logging.level.as_str().into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
Ok(())
}
fn log_startup_info(config: &Config, table_name: &str, create_table: bool) {
println!("🚀 Starting Custom MCP Server example with Cognito and DynamoDB storage...");
println!("📋 Configuration:");
println!(" - Architecture: Microkernel (independent handlers)");
println!(" - MCP Server: Custom implementation with specialized tools");
println!(" - OAuth Provider: AWS Cognito");
println!(" - Storage Backend: DynamoDB");
println!(" - Server: {}:{}", config.server.host, config.server.port);
println!(" - Version: {}", config.server.version);
println!();
println!("🔧 Custom MCP Server Tools:");
println!(" - list_files: List files and directories");
println!(" - read_file: Read file contents");
println!(" - write_file: Write content to files");
println!(" - get_system_info: Get system information");
println!(" - count_words: Count words, lines, and characters");
println!(" - search_text: Search for patterns in text");
println!(" - get_datetime: Get current date and time");
println!();
println!("🔐 AWS Cognito Configuration:");
println!(
" - Client ID: {}",
if config.cognito.client_id.is_empty() {
"Not configured"
} else {
"Configured"
}
);
println!(
" - Client Secret: {}",
match &config.cognito.client_secret {
Some(secret) if !secret.is_empty() => "Configured",
_ => "Not configured (Public Client)",
}
);
println!(
" - Domain: {}",
if config.cognito.cognito_domain.is_empty() {
"Not configured"
} else {
&config.cognito.cognito_domain
}
);
println!(
" - Region: {}",
if config.cognito.region.is_empty() {
"Not configured"
} else {
&config.cognito.region
}
);
println!(
" - User Pool ID: {}",
if config.cognito.user_pool_id.is_empty() {
"Not configured"
} else {
&config.cognito.user_pool_id
}
);
println!(" - Scopes: {}", config.cognito.scope);
println!();
println!("🗄️ DynamoDB Storage Configuration:");
println!(" - Table Name: {}", table_name);
println!(" - Auto-create Table: {}", create_table);
println!(" - TTL Attribute: expires_at");
println!();
println!("🔧 Handlers:");
println!(" - OAuth Provider (Cognito authentication & authorization)");
println!(" - Streamable HTTP Handler (MCP over HTTP with custom server)");
println!(" - SSE Handler (MCP over SSE with custom server)");
println!();
println!("🏗️ Microkernel Architecture:");
println!(" - Custom MCP server with specialized tools");
println!(" - Independent handlers that can operate standalone");
println!(" - Runtime composition of services");
println!(" - Single responsibility per handler");
println!(" - Easy testing and maintenance");
println!();
println!("🌐 MCP Protocol Endpoints:");
println!(
" - HTTP (streamable): http://{}:{}/mcp/http",
config.server.host, config.server.port
);
println!(
" - SSE: http://{}:{}/mcp/sse",
config.server.host, config.server.port
);
println!(
" - SSE Messages: http://{}:{}/mcp/message",
config.server.host, config.server.port
);
println!();
println!("🔐 OAuth 2.0 Endpoints:");
println!(
" - Authorization: https://{}/oauth2/authorize",
config.cognito.cognito_domain
);
println!(
" - Token: https://{}/oauth2/token",
config.cognito.cognito_domain
);
println!(
" - JWKS: https://{}/oauth2/jwks",
config.cognito.cognito_domain
);
println!(
" - UserInfo: https://{}/oauth2/userInfo",
config.cognito.cognito_domain
);
println!();
}