use crate::mcp::tools::agent_context_tools::IndexManager;
use crate::mcp_pmcp::agent_context_handlers::{
PmatFindSimilarHandler, PmatGetFunctionHandler, PmatIndexStatsHandler, PmatQueryCodeHandler,
};
use crate::mcp_pmcp::analyze_handlers::{
AnalyzeBigOTool, AnalyzeComplexityTool, AnalyzeDagTool, AnalyzeDeadCodeTool,
AnalyzeDeepContextTool, AnalyzeSatdTool,
};
use crate::mcp_pmcp::context_handlers::{GenerateContextTool, GitTool, ScaffoldProjectTool};
use crate::mcp_pmcp::handlers::{
RefactorGetStateTool, RefactorNextIterationTool, RefactorStartTool, RefactorStopTool,
};
use crate::mcp_pmcp::pdmt_handler::PdmtTool;
use crate::mcp_pmcp::quality_handlers::QualityGateTool;
use crate::mcp_pmcp::quality_proxy_handler::QualityProxyTool;
use crate::mcp_server::state_manager::StateManager;
use async_trait::async_trait;
use pmcp::shared::{StdioTransport, Transport, TransportMessage};
use pmcp::{Server, ServerCapabilities};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tracing::info;
#[derive(Debug)]
struct EofSignalingTransport<T: Transport> {
inner: T,
session_end_tx: Option<oneshot::Sender<String>>,
}
impl<T: Transport> EofSignalingTransport<T> {
fn new(inner: T) -> (Self, oneshot::Receiver<String>) {
let (tx, rx) = oneshot::channel();
(
Self {
inner,
session_end_tx: Some(tx),
},
rx,
)
}
}
#[async_trait]
impl<T: Transport> Transport for EofSignalingTransport<T> {
async fn send(&mut self, message: TransportMessage) -> pmcp::Result<()> {
self.inner.send(message).await
}
async fn receive(&mut self) -> pmcp::Result<TransportMessage> {
let result = self.inner.receive().await;
if let Err(e) = &result {
if let Some(tx) = self.session_end_tx.take() {
let _ = tx.send(e.to_string());
}
}
result
}
async fn close(&mut self) -> pmcp::Result<()> {
self.inner.close().await
}
fn is_connected(&self) -> bool {
self.inner.is_connected()
}
fn transport_type(&self) -> &'static str {
self.inner.transport_type()
}
}
pub struct SimpleUnifiedServer {
state_manager: Arc<Mutex<StateManager>>,
}
impl SimpleUnifiedServer {
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
state_manager: Arc::new(Mutex::new(StateManager::new())),
})
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting PMAT Simple Unified MCP server (pmcp SDK)");
let index_manager = Arc::new(IndexManager::new(
std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
));
let server = Server::builder()
.name("paiml-mcp-agent-toolkit")
.version(env!("CARGO_PKG_VERSION"))
.capabilities(ServerCapabilities::tools_only())
.tool("analyze_complexity", AnalyzeComplexityTool)
.tool("analyze_satd", AnalyzeSatdTool)
.tool("analyze_dead_code", AnalyzeDeadCodeTool)
.tool("analyze_dag", AnalyzeDagTool)
.tool("analyze_deep_context", AnalyzeDeepContextTool)
.tool("analyze_big_o", AnalyzeBigOTool)
.tool(
"refactor.start",
RefactorStartTool::new(self.state_manager.clone()),
)
.tool(
"refactor.nextIteration",
RefactorNextIterationTool::new(self.state_manager.clone()),
)
.tool(
"refactor.getState",
RefactorGetStateTool::new(self.state_manager.clone()),
)
.tool(
"refactor.stop",
RefactorStopTool::new(self.state_manager.clone()),
)
.tool("quality_gate", QualityGateTool)
.tool("quality_proxy", QualityProxyTool)
.tool("pdmt_deterministic_todos", PdmtTool::new())
.tool("git_operation", GitTool)
.tool("generate_context", GenerateContextTool)
.tool("scaffold_project", ScaffoldProjectTool)
.tool(
"pmat_query_code",
PmatQueryCodeHandler::new(index_manager.clone()),
)
.tool(
"pmat_get_function",
PmatGetFunctionHandler::new(index_manager.clone()),
)
.tool(
"pmat_find_similar",
PmatFindSimilarHandler::new(index_manager.clone()),
)
.tool(
"pmat_index_stats",
PmatIndexStatsHandler::new(index_manager.clone()),
)
.build()?;
info!("PMAT Simple Unified MCP server ready with 20 tools (16 core + 4 agent_context), listening on stdio");
let (transport, session_end) = EofSignalingTransport::new(StdioTransport::new());
tokio::select! {
result = server.run(transport) => {
result?;
}
reason = session_end => {
use std::io::Write;
let _ = std::io::stdout().flush();
let reason = reason.unwrap_or_else(|_| "transport dropped".to_string());
info!("MCP stdio session ended ({reason}); exiting");
}
}
info!("PMAT Simple Unified MCP server shutting down");
Ok(())
}
}
impl Default for SimpleUnifiedServer {
fn default() -> Self {
Self::new().expect("Failed to create simple unified server")
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod property_tests {
use proptest::prelude::*;
proptest! {
#[test]
fn basic_property_stability(_input in ".*") {
prop_assert!(true);
}
#[test]
fn module_consistency_check(_x in 0u32..1000) {
prop_assert!(_x < 1001);
}
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod active_tests {
use super::*;
#[test]
fn test_simple_unified_server_new() {
let result = SimpleUnifiedServer::new();
assert!(result.is_ok());
}
#[test]
fn test_simple_unified_server_default() {
let server = SimpleUnifiedServer::default();
let _ = server;
}
#[test]
fn test_server_is_send() {
fn assert_send<T: Send>() {}
assert_send::<SimpleUnifiedServer>();
}
#[test]
fn test_server_is_sync() {
fn assert_sync<T: Sync>() {}
assert_sync::<SimpleUnifiedServer>();
}
#[test]
fn test_server_size() {
let size = std::mem::size_of::<SimpleUnifiedServer>();
assert!(
size <= 16,
"Server struct is larger than expected: {} bytes",
size
);
}
#[test]
fn test_new_does_not_panic() {
let _ = std::panic::catch_unwind(|| {
let _ = SimpleUnifiedServer::new();
});
}
#[tokio::test]
async fn test_state_manager_accessible() {
let server = SimpleUnifiedServer::new().unwrap();
let state = server.state_manager.lock().await;
drop(state);
}
#[tokio::test]
async fn test_state_manager_thread_safety() {
let server = SimpleUnifiedServer::new().unwrap();
let state_clone = server.state_manager.clone();
{
let _state1 = server.state_manager.lock().await;
}
{
let _state2 = state_clone.lock().await;
}
}
#[test]
fn test_all_20_live_tools_advertise_description_and_schema() {
use pmcp::ToolHandler;
let state_manager = Arc::new(Mutex::new(StateManager::new()));
let index_manager = Arc::new(IndexManager::new(PathBuf::from(".")));
let tools: Vec<(&str, Option<pmcp::types::ToolInfo>, bool)> = vec![
("analyze_complexity", AnalyzeComplexityTool.metadata(), true),
("analyze_satd", AnalyzeSatdTool.metadata(), true),
("analyze_dead_code", AnalyzeDeadCodeTool.metadata(), true),
("analyze_dag", AnalyzeDagTool.metadata(), true),
(
"analyze_deep_context",
AnalyzeDeepContextTool.metadata(),
true,
),
("analyze_big_o", AnalyzeBigOTool.metadata(), true),
(
"refactor.start",
RefactorStartTool::new(state_manager.clone()).metadata(),
true,
),
(
"refactor.nextIteration",
RefactorNextIterationTool::new(state_manager.clone()).metadata(),
false,
),
(
"refactor.getState",
RefactorGetStateTool::new(state_manager.clone()).metadata(),
false,
),
(
"refactor.stop",
RefactorStopTool::new(state_manager).metadata(),
false,
),
("quality_gate", QualityGateTool.metadata(), true),
("quality_proxy", QualityProxyTool.metadata(), true),
("pdmt_deterministic_todos", PdmtTool::new().metadata(), true),
("git_operation", GitTool.metadata(), true),
("generate_context", GenerateContextTool.metadata(), true),
("scaffold_project", ScaffoldProjectTool.metadata(), true),
(
"pmat_query_code",
PmatQueryCodeHandler::new(index_manager.clone()).metadata(),
true,
),
(
"pmat_get_function",
PmatGetFunctionHandler::new(index_manager.clone()).metadata(),
true,
),
(
"pmat_find_similar",
PmatFindSimilarHandler::new(index_manager.clone()).metadata(),
true,
),
(
"pmat_index_stats",
PmatIndexStatsHandler::new(index_manager).metadata(),
true,
),
];
assert_eq!(
tools.len(),
20,
"registry drift: update this test when tools are added or removed"
);
for (name, metadata, takes_arguments) in tools {
let info = metadata.unwrap_or_else(|| {
panic!(
"{name}: metadata() is None — tools/list would advertise \
an empty description and empty inputSchema"
)
});
assert_eq!(
info.name, name,
"{name}: metadata name must match the registered tool name"
);
assert!(
info.description
.as_deref()
.is_some_and(|d| !d.trim().is_empty()),
"{name}: description must be non-empty"
);
let schema = info
.input_schema
.as_object()
.unwrap_or_else(|| panic!("{name}: inputSchema must be a JSON object"));
assert_eq!(
schema.get("type").and_then(|t| t.as_str()),
Some("object"),
"{name}: inputSchema must declare type: object"
);
let properties = schema
.get("properties")
.and_then(|p| p.as_object())
.unwrap_or_else(|| panic!("{name}: inputSchema must have a properties map"));
if takes_arguments {
assert!(
!properties.is_empty(),
"{name}: inputSchema.properties must be non-empty for a \
tool that takes arguments"
);
}
}
}
#[test]
fn test_refactor_tool_descriptions_disclose_simulation() {
use pmcp::ToolHandler;
let state_manager = Arc::new(Mutex::new(StateManager::new()));
let descriptions = [
(
"refactor.start",
RefactorStartTool::new(state_manager.clone()).metadata(),
),
(
"refactor.nextIteration",
RefactorNextIterationTool::new(state_manager.clone()).metadata(),
),
(
"refactor.getState",
RefactorGetStateTool::new(state_manager.clone()).metadata(),
),
(
"refactor.stop",
RefactorStopTool::new(state_manager).metadata(),
),
];
for (name, metadata) in descriptions {
let description = metadata
.and_then(|info| info.description)
.unwrap_or_default();
assert!(
description.contains("simulated analysis engine"),
"{name}: description must disclose the simulated analysis \
engine, got: {description}"
);
}
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod eof_shutdown_tests {
use super::*;
use pmcp::error::TransportError;
use pmcp::types::{Notification, ProgressNotification, ProgressToken};
#[derive(Debug)]
struct ScriptedTransport {
receives: Vec<pmcp::Result<TransportMessage>>,
}
#[async_trait]
impl Transport for ScriptedTransport {
async fn send(&mut self, _message: TransportMessage) -> pmcp::Result<()> {
Ok(())
}
async fn receive(&mut self) -> pmcp::Result<TransportMessage> {
self.receives
.pop()
.unwrap_or_else(|| Err(TransportError::ConnectionClosed.into()))
}
async fn close(&mut self) -> pmcp::Result<()> {
Ok(())
}
}
fn progress_message() -> TransportMessage {
TransportMessage::Notification(Notification::Progress(ProgressNotification::new(
ProgressToken::String("test".to_string()),
50.0,
None,
)))
}
#[tokio::test]
async fn eof_on_receive_signals_session_end_and_propagates_error() {
let inner = ScriptedTransport { receives: vec![] };
let (mut transport, session_end) = EofSignalingTransport::new(inner);
let result = transport.receive().await;
assert!(result.is_err(), "EOF error must propagate to pmcp's reader");
let reason = session_end
.await
.expect("session-end signal must fire on EOF");
assert!(
reason.contains("Connection closed"),
"expected ConnectionClosed reason, got: {reason}"
);
}
#[tokio::test]
async fn successful_receive_does_not_signal_session_end() {
let inner = ScriptedTransport {
receives: vec![Ok(progress_message())],
};
let (mut transport, mut session_end) = EofSignalingTransport::new(inner);
let result = transport.receive().await;
assert!(result.is_ok());
assert!(
session_end.try_recv().is_err(),
"session-end must not fire on a successful receive"
);
}
#[tokio::test]
async fn send_does_not_signal_session_end() {
let inner = ScriptedTransport { receives: vec![] };
let (mut transport, mut session_end) = EofSignalingTransport::new(inner);
transport
.send(progress_message())
.await
.expect("scripted send always succeeds");
assert!(
session_end.try_recv().is_err(),
"session-end must not fire on send"
);
}
#[tokio::test]
async fn session_end_signal_fires_at_most_once() {
let inner = ScriptedTransport { receives: vec![] };
let (mut transport, session_end) = EofSignalingTransport::new(inner);
let _ = transport.receive().await; let _ = transport.receive().await; assert!(session_end.await.is_ok());
}
#[tokio::test]
async fn wrapper_delegates_transport_metadata_to_inner() {
let inner = ScriptedTransport { receives: vec![] };
let (transport, _session_end) = EofSignalingTransport::new(inner);
assert!(transport.is_connected());
assert_eq!(transport.transport_type(), "unknown");
}
}
#[cfg(all(test, feature = "broken-tests"))]
mod coverage_tests {
use super::*;
#[test]
fn test_simple_unified_server_new() {
let result = SimpleUnifiedServer::new();
assert!(result.is_ok());
let server = result.unwrap();
let _ = server;
}
#[test]
fn test_simple_unified_server_default() {
let server = SimpleUnifiedServer::default();
let _ = server;
}
#[test]
fn test_simple_unified_server_state_manager_initialized() {
let server = SimpleUnifiedServer::new().unwrap();
assert!(std::mem::size_of_val(&server) > 0);
}
#[test]
fn test_server_has_state_manager() {
let server = SimpleUnifiedServer::new().unwrap();
let _ = &server.state_manager;
}
#[test]
fn test_multiple_server_instances() {
let server1 = SimpleUnifiedServer::new().unwrap();
let server2 = SimpleUnifiedServer::new().unwrap();
let _ = server1;
let _ = server2;
}
#[test]
fn test_analyze_tools_importable() {
let _ = AnalyzeComplexityTool::new();
let _ = AnalyzeSatdTool::new();
let _ = AnalyzeDeadCodeTool::new();
let _ = AnalyzeDagTool::new();
let _ = AnalyzeDeepContextTool::new();
let _ = AnalyzeBigOTool::new();
}
#[test]
fn test_refactor_tools_require_state_manager() {
let state_manager = Arc::new(Mutex::new(StateManager::new()));
let _ = RefactorStartTool::new(state_manager.clone());
let _ = RefactorNextIterationTool::new(state_manager.clone());
let _ = RefactorGetStateTool::new(state_manager.clone());
let _ = RefactorStopTool::new(state_manager.clone());
}
#[test]
fn test_quality_tools_importable() {
let _ = QualityGateTool::new();
let _ = QualityProxyTool::new();
let _ = PdmtTool::new();
}
#[test]
fn test_context_tools_importable() {
let _ = GitTool::new();
let _ = GenerateContextTool::new();
let _ = ScaffoldProjectTool::new();
}
#[test]
fn test_server_builder_pattern_accessible() {
let builder = Server::builder()
.name("test-server")
.version("0.1.0")
.capabilities(ServerCapabilities::tools_only());
let _ = builder;
}
#[tokio::test]
async fn test_server_run_requires_stdio() {
let server = SimpleUnifiedServer::new().unwrap();
let _ = server;
}
#[tokio::test]
async fn test_state_manager_accessible() {
let server = SimpleUnifiedServer::new().unwrap();
let state = server.state_manager.lock().await;
drop(state);
}
#[tokio::test]
async fn test_state_manager_thread_safety() {
let server = SimpleUnifiedServer::new().unwrap();
let state_clone = server.state_manager.clone();
{
let _state1 = server.state_manager.lock().await;
}
{
let _state2 = state_clone.lock().await;
}
}
#[test]
fn test_all_tool_types_accessible() {
assert!(std::any::type_name::<AnalyzeComplexityTool>().contains("ComplexityTool"));
assert!(std::any::type_name::<AnalyzeSatdTool>().contains("SatdTool"));
assert!(std::any::type_name::<AnalyzeDeadCodeTool>().contains("DeadCodeTool"));
assert!(std::any::type_name::<AnalyzeDagTool>().contains("AnalyzeDagTool"));
assert!(std::any::type_name::<AnalyzeDeepContextTool>().contains("AnalyzeDeepContextTool"));
assert!(std::any::type_name::<AnalyzeBigOTool>().contains("AnalyzeBigOTool"));
assert!(std::any::type_name::<QualityGateTool>().contains("QualityGateTool"));
assert!(std::any::type_name::<QualityProxyTool>().contains("QualityProxyTool"));
assert!(std::any::type_name::<GenerateContextTool>().contains("ContextGenerateTool"));
assert!(std::any::type_name::<ScaffoldProjectTool>().contains("ContextSummaryTool"));
assert!(std::any::type_name::<GitTool>().contains("GitStatusTool"));
}
#[test]
fn test_server_capabilities_structure() {
let capabilities = ServerCapabilities::tools_only();
assert!(capabilities.tools.is_some());
assert!(capabilities.tools.as_ref().unwrap().list_changed.is_none());
}
#[test]
fn test_server_is_send() {
fn assert_send<T: Send>() {}
assert_send::<SimpleUnifiedServer>();
}
#[test]
fn test_server_is_sync() {
fn assert_sync<T: Sync>() {}
assert_sync::<SimpleUnifiedServer>();
}
#[test]
fn test_server_size() {
let size = std::mem::size_of::<SimpleUnifiedServer>();
assert!(
size <= 16,
"Server struct is larger than expected: {} bytes",
size
);
}
#[test]
fn test_new_does_not_panic() {
let _ = std::panic::catch_unwind(|| {
let _ = SimpleUnifiedServer::new();
});
}
#[test]
fn test_default_does_not_panic() {
let result = std::panic::catch_unwind(|| {
let _ = SimpleUnifiedServer::default();
});
assert!(result.is_ok());
}
}