use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Instant;
use devboy_core::{BuiltinToolsConfig, Provider};
use serde::Deserialize;
use serde_json::Value;
use tokio::sync::oneshot;
use crate::layered::{SessionPipeline, extract_file_path as file_path_from_args, is_mutating_tool};
use crate::protocol::{
InitializeParams, InitializeResult, JsonRpcError, JsonRpcRequest, JsonRpcResponse, MCP_VERSION,
RequestId, ServerCapabilities, ServerInfo, ToolCallParams, ToolCallResult, ToolsCapability,
ToolsListResult,
};
use crate::proxy::ProxyManager;
use crate::routing::{RoutingEngine, RoutingTarget};
use crate::telemetry::{TelemetryBuffer, TelemetryEvent, TelemetryStatus};
use crate::transport::{IncomingMessage, StdioTransport};
pub struct DeferredInit {
pub proxy_manager: ProxyManager,
pub builtin_tools_config: Option<BuiltinToolsConfig>,
pub routing_engine: Option<Arc<RoutingEngine>>,
}
pub struct McpServer {
contexts: HashMap<String, Vec<Arc<dyn Provider>>>,
knowledge_base_contexts: HashMap<String, Vec<Arc<dyn devboy_core::KnowledgeBaseProvider>>>,
messenger_contexts: HashMap<String, Vec<Arc<dyn devboy_core::MessengerProvider>>>,
active_context: RwLock<String>,
initialized: bool,
proxy_manager: ProxyManager,
builtin_tools_config: BuiltinToolsConfig,
meeting_providers: Vec<Arc<dyn devboy_core::MeetingNotesProvider>>,
routing_engine: Option<Arc<RoutingEngine>>,
telemetry: Option<TelemetryBuffer>,
deferred_init: Option<oneshot::Receiver<DeferredInit>>,
layered_pipeline: Option<SessionPipeline>,
}
impl McpServer {
pub fn new() -> Self {
let mut contexts = HashMap::new();
contexts.insert("default".to_string(), Vec::new());
let mut knowledge_base_contexts = HashMap::new();
knowledge_base_contexts.insert("default".to_string(), Vec::new());
let mut messenger_contexts = HashMap::new();
messenger_contexts.insert("default".to_string(), Vec::new());
Self {
contexts,
knowledge_base_contexts,
messenger_contexts,
active_context: RwLock::new("default".to_string()),
initialized: false,
proxy_manager: ProxyManager::new(),
builtin_tools_config: BuiltinToolsConfig::default(),
meeting_providers: Vec::new(),
routing_engine: None,
telemetry: None,
deferred_init: None,
layered_pipeline: None,
}
}
pub fn enable_layered_pipeline(&mut self, pipeline: SessionPipeline) {
self.layered_pipeline = Some(pipeline);
tracing::info!(
"Paper 2 layered pipeline enabled — L0 dedup active. \
Edit ~/.devboy/pipeline_config.toml (or set DEVBOY_PIPELINE_CONFIG) \
to tune knobs. See `devboy tune analyze` for split-savings metrics."
);
}
pub fn on_compaction_boundary(&self) {
if let Some(p) = &self.layered_pipeline {
p.on_compaction_boundary();
}
}
pub fn set_routing_engine(&mut self, engine: Arc<RoutingEngine>) {
self.routing_engine = Some(engine);
}
pub fn set_telemetry(&mut self, buffer: TelemetryBuffer) {
self.telemetry = Some(buffer);
}
pub fn set_builtin_tools_config(
&mut self,
config: BuiltinToolsConfig,
) -> devboy_core::Result<()> {
config.validate()?;
self.builtin_tools_config = config;
Ok(())
}
pub fn set_proxy_manager(&mut self, proxy_manager: ProxyManager) {
self.proxy_manager = proxy_manager;
}
pub fn set_deferred_init(&mut self, receiver: oneshot::Receiver<DeferredInit>) {
self.deferred_init = Some(receiver);
}
async fn resolve_deferred_init(&mut self) {
if let Some(receiver) = self.deferred_init.take() {
match receiver.await {
Ok(init) => {
if !init.proxy_manager.is_empty() {
self.proxy_manager = init.proxy_manager;
}
if let Some(bt_config) = init.builtin_tools_config
&& !bt_config.is_empty()
{
if let Err(e) = bt_config.validate() {
tracing::warn!("Remote builtin_tools config is invalid, ignoring: {e}");
} else {
self.builtin_tools_config = bt_config;
}
}
if let Some(engine) = init.routing_engine {
self.routing_engine = Some(engine);
}
}
Err(_) => {
tracing::warn!("Deferred initialization was cancelled");
}
}
}
}
pub fn add_meeting_provider(&mut self, provider: Arc<dyn devboy_core::MeetingNotesProvider>) {
self.meeting_providers.push(provider);
}
pub fn add_knowledge_base_provider(
&mut self,
provider: Arc<dyn devboy_core::KnowledgeBaseProvider>,
) {
self.add_knowledge_base_provider_to_context("default", provider);
}
pub fn add_knowledge_base_provider_to_context(
&mut self,
context: &str,
provider: Arc<dyn devboy_core::KnowledgeBaseProvider>,
) {
self.contexts.entry(context.to_string()).or_default();
self.knowledge_base_contexts
.entry(context.to_string())
.or_default()
.push(provider);
}
pub fn add_messenger_provider(&mut self, provider: Arc<dyn devboy_core::MessengerProvider>) {
self.add_messenger_provider_to_context("default", provider);
}
pub fn add_messenger_provider_to_context(
&mut self,
context: &str,
provider: Arc<dyn devboy_core::MessengerProvider>,
) {
self.contexts.entry(context.to_string()).or_default();
self.messenger_contexts
.entry(context.to_string())
.or_default()
.push(provider);
}
pub fn add_provider(&mut self, provider: Arc<dyn Provider>) {
self.contexts
.entry("default".to_string())
.or_default()
.push(provider);
}
pub fn add_provider_to_context(&mut self, context: &str, provider: Arc<dyn Provider>) {
self.contexts
.entry(context.to_string())
.or_default()
.push(provider);
}
pub fn ensure_context(&mut self, context: &str) {
self.contexts.entry(context.to_string()).or_default();
self.knowledge_base_contexts
.entry(context.to_string())
.or_default();
self.messenger_contexts
.entry(context.to_string())
.or_default();
}
pub fn set_active_context(&self, context: &str) -> devboy_core::Result<()> {
if !self.contexts.contains_key(context) {
return Err(devboy_core::Error::Config(format!(
"Context '{}' not found",
context
)));
}
let mut active = self
.active_context
.write()
.map_err(|_| devboy_core::Error::Config("Active context lock poisoned".to_string()))?;
*active = context.to_string();
Ok(())
}
pub fn active_context_name(&self) -> String {
self.active_context
.read()
.map(|g| g.clone())
.unwrap_or_else(|_| "default".to_string())
}
pub fn context_names(&self) -> Vec<String> {
let mut names: Vec<String> = self.contexts.keys().cloned().collect();
names.sort();
names
}
pub fn active_providers(&self) -> Vec<Arc<dyn Provider>> {
let active = self.active_context_name();
self.contexts.get(&active).cloned().unwrap_or_default()
}
pub fn active_knowledge_base_providers(
&self,
) -> Vec<Arc<dyn devboy_core::KnowledgeBaseProvider>> {
let active = self.active_context_name();
self.knowledge_base_contexts
.get(&active)
.cloned()
.unwrap_or_default()
}
pub fn active_messenger_providers(&self) -> Vec<Arc<dyn devboy_core::MessengerProvider>> {
let active = self.active_context_name();
self.messenger_contexts
.get(&active)
.cloned()
.unwrap_or_default()
}
pub fn providers(&self) -> &[Arc<dyn Provider>] {
self.contexts
.get("default")
.map(Vec::as_slice)
.unwrap_or(&[])
}
pub async fn run(&mut self) -> devboy_core::Result<()> {
tracing::info!(
"Starting MCP server with {} contexts (active: {})",
self.contexts.len(),
self.active_context_name()
);
let mut transport = StdioTransport::stdio();
loop {
match transport.read_message() {
Ok(Some(msg)) => {
let response = self.handle_message(msg).await;
if let Some(resp) = response
&& let Err(e) = transport.write_response(&resp)
{
tracing::error!("Failed to write response: {}", e);
break;
}
}
Ok(None) => {
tracing::info!("EOF received, shutting down");
break;
}
Err(e) => {
tracing::error!("Transport error: {}", e);
let error_resp = JsonRpcResponse::error(
RequestId::Null,
JsonRpcError::parse_error(&e.to_string()),
);
let _ = transport.write_response(&error_resp);
}
}
}
tracing::info!("MCP server stopped");
Ok(())
}
async fn handle_message(&mut self, msg: IncomingMessage) -> Option<JsonRpcResponse> {
match msg {
IncomingMessage::Request(req) => Some(self.handle_request(req).await),
IncomingMessage::Notification(notif) => {
self.handle_notification(¬if.method);
None }
}
}
pub async fn handle_request(&mut self, req: JsonRpcRequest) -> JsonRpcResponse {
tracing::debug!("Handling request: {} (id: {:?})", req.method, req.id);
match req.method.as_str() {
"initialize" => self.handle_initialize(req.id, req.params),
"tools/list" => {
self.resolve_deferred_init().await;
self.handle_tools_list(req.id)
}
"tools/call" => {
self.resolve_deferred_init().await;
self.handle_tools_call(req.id, req.params).await
}
"ping" => self.handle_ping(req.id),
method => {
tracing::warn!("Unknown method: {}", method);
JsonRpcResponse::error(req.id, JsonRpcError::method_not_found(method))
}
}
}
fn handle_notification(&mut self, method: &str) {
match method {
"initialized" => {
tracing::info!("Client initialized");
}
"notifications/cancelled" => {
tracing::debug!("Request cancelled by client");
}
"notifications/devboy/compact" => {
tracing::info!("Host compaction signal received — advancing dedup partition");
self.on_compaction_boundary();
}
_ => {
tracing::debug!("Ignoring notification: {}", method);
}
}
}
fn handle_initialize(&mut self, id: RequestId, params: Option<Value>) -> JsonRpcResponse {
if self.initialized {
return JsonRpcResponse::error(
id,
JsonRpcError::invalid_request("Server already initialized"),
);
}
if let Some(params) = params {
match serde_json::from_value::<InitializeParams>(params) {
Ok(init_params) => {
tracing::info!(
"Client: {} v{} (protocol: {})",
init_params.client_info.name,
init_params.client_info.version,
init_params.protocol_version
);
}
Err(e) => {
tracing::warn!("Failed to parse initialize params: {}", e);
}
}
}
self.initialized = true;
let result = InitializeResult {
protocol_version: MCP_VERSION.to_string(),
capabilities: ServerCapabilities {
tools: Some(ToolsCapability {
list_changed: false,
}),
resources: None,
prompts: None,
},
server_info: ServerInfo {
name: "devboy-mcp".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
},
};
JsonRpcResponse::success(id, serde_json::to_value(result).unwrap())
}
pub fn handle_tools_list(&self, id: RequestId) -> JsonRpcResponse {
let providers = self.active_providers();
let base_tools = devboy_executor::tools::base_tool_definitions();
let mut tools: Vec<crate::protocol::ToolDefinition> = base_tools
.into_iter()
.map(|t| {
let mut schema = serde_json::to_value(&t.input_schema).unwrap_or_default();
if let Some(obj) = schema.as_object_mut() {
obj.entry("type").or_insert_with(|| "object".into());
}
crate::protocol::ToolDefinition {
name: t.name,
description: t.description,
input_schema: schema,
category: Some(t.category),
}
})
.collect();
use devboy_core::IssueProvider;
let has_issue_providers = !providers.is_empty();
let has_mr_providers = providers.iter().any(|p| {
matches!(
IssueProvider::provider_name(p.as_ref()),
"github" | "gitlab"
)
});
let has_jira_provider = providers
.iter()
.any(|p| IssueProvider::provider_name(p.as_ref()) == "jira");
let has_meeting_providers = !self.meeting_providers.is_empty();
let has_knowledge_base_providers = !self.active_knowledge_base_providers().is_empty();
let has_messenger_providers = !self.active_messenger_providers().is_empty();
let any_upload = providers
.iter()
.any(|p| p.asset_capabilities().issue.upload);
let any_delete = providers
.iter()
.any(|p| p.asset_capabilities().issue.delete);
tools.retain(|t| {
match t.name.as_str() {
"upload_asset" => return any_upload,
"delete_asset" => return any_delete,
_ => {}
}
t.category
.map(|cat| match cat {
devboy_core::ToolCategory::IssueTracker => has_issue_providers,
devboy_core::ToolCategory::Epics => has_issue_providers,
devboy_core::ToolCategory::GitRepository => has_mr_providers,
devboy_core::ToolCategory::MeetingNotes => has_meeting_providers,
devboy_core::ToolCategory::KnowledgeBase => has_knowledge_base_providers,
devboy_core::ToolCategory::Messenger => has_messenger_providers,
devboy_core::ToolCategory::Releases => has_mr_providers,
devboy_core::ToolCategory::JiraStructure => has_jira_provider,
})
.unwrap_or(true) });
for tool in devboy_executor::tools::mcp_only_tools() {
let mut schema = serde_json::to_value(&tool.input_schema)
.expect("McpOnlyTool::input_schema must be serializable");
if let Some(obj) = schema.as_object_mut() {
obj.entry("type").or_insert_with(|| "object".into());
}
tools.push(crate::protocol::ToolDefinition {
name: tool.name,
description: tool.description,
input_schema: schema,
category: None,
});
}
if !self.builtin_tools_config.is_empty() {
tools.retain(|t| self.builtin_tools_config.is_tool_allowed(&t.name));
}
tools.extend(self.proxy_manager.all_tools());
let result = ToolsListResult { tools };
JsonRpcResponse::success(id, serde_json::to_value(result).unwrap())
}
async fn handle_tools_call(&mut self, id: RequestId, params: Option<Value>) -> JsonRpcResponse {
let params: ToolCallParams = match params {
Some(p) => match serde_json::from_value(p) {
Ok(params) => params,
Err(e) => {
return JsonRpcResponse::error(
id,
JsonRpcError::invalid_params(&e.to_string()),
);
}
},
None => {
return JsonRpcResponse::error(id, JsonRpcError::invalid_params("Missing params"));
}
};
tracing::info!("Calling tool: {}", params.name);
if !self.builtin_tools_config.is_empty()
&& !self.builtin_tools_config.is_tool_allowed(¶ms.name)
&& !self.proxy_manager.has_tool(¶ms.name)
{
return JsonRpcResponse::error(
id,
JsonRpcError::method_not_found(&format!(
"Tool '{}' is disabled by builtin_tools configuration",
params.name
)),
);
}
if let Some(result) = self.handle_internal_tool(¶ms).await {
return JsonRpcResponse::success(id, serde_json::to_value(result).unwrap());
}
if let Some(pipeline) = &self.layered_pipeline
&& is_mutating_tool(¶ms.name)
&& let Some(path) = file_path_from_args(params.arguments.as_ref())
{
pipeline.invalidate_file(&path);
}
if let Some(pipeline) = &self.layered_pipeline
&& pipeline.should_skip(¶ms.name)
{
pipeline.record_fail_fast_skip(40);
let hint = format!(
"> [enrichment: '{}' fail-fast — last calls returned 0 bytes; planner refuses to re-issue. Try a different query.]",
params.name
);
return JsonRpcResponse::success(
id,
serde_json::to_value(ToolCallResult::text(hint)).unwrap(),
);
}
let started = Instant::now();
let (result, was_fallback, emitted_reason, emitted_detail, upstream_label, resolved_name) =
self.dispatch_with_routing(¶ms).await;
let result = if let Some(pipeline) = &self.layered_pipeline {
let req_id = match &id {
RequestId::Number(n) => format!("req_{n}"),
RequestId::String(s) => s.clone(),
RequestId::Null => "req_null".to_string(),
};
let ts_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
pipeline.process(&req_id, ¶ms, result, ts_ms)
} else {
result
};
let result = if let Some(pipeline) = &self.layered_pipeline
&& result.is_error != Some(true)
{
let prev_json = result
.content
.first()
.map(|c| {
let crate::protocol::ToolResultContent::Text { text } = c;
serde_json::Value::String(text.clone())
})
.unwrap_or(serde_json::Value::Null);
let hint = pipeline.speculate_after(¶ms.name, &prev_json).await;
if !hint.is_empty() {
let mut new_result = result.clone();
if let Some(last) = new_result.content.last_mut() {
let crate::protocol::ToolResultContent::Text { text } = last;
text.push_str(&hint);
}
new_result
} else {
result
}
} else {
result
};
if let Some(buffer) = &self.telemetry {
let latency_ms = started.elapsed().as_millis() as u64;
let status = if result.is_error == Some(true) {
TelemetryStatus::Error
} else {
TelemetryStatus::Success
};
let mut event = TelemetryEvent::now(&resolved_name, emitted_reason);
event.routing_detail = emitted_detail;
event.upstream = upstream_label;
event.status = status;
event.latency_ms = latency_ms;
event.was_fallback = was_fallback;
buffer.record(event).await;
}
JsonRpcResponse::success(id, serde_json::to_value(result).unwrap())
}
async fn handle_internal_tool(&self, params: &ToolCallParams) -> Option<ToolCallResult> {
match params.name.as_str() {
"list_contexts" => {
let active = self.active_context_name();
let names = self.context_names();
let content = names
.into_iter()
.map(|name| {
if name == active {
format!("* {} (active)", name)
} else {
format!("* {}", name)
}
})
.collect::<Vec<_>>()
.join("\n");
Some(ToolCallResult::text(content))
}
"get_current_context" => Some(ToolCallResult::text(self.active_context_name())),
"compact_pipeline_cache" => {
self.on_compaction_boundary();
Some(ToolCallResult::text(
"pipeline cache partition advanced".to_string(),
))
}
"use_context" => {
#[derive(Deserialize)]
struct UseContextParams {
name: String,
}
Some(match ¶ms.arguments {
Some(args) => match serde_json::from_value::<UseContextParams>(args.clone()) {
Ok(args) => match self.set_active_context(&args.name) {
Ok(()) => ToolCallResult::text(format!(
"Active context set to '{}'",
args.name
)),
Err(e) => ToolCallResult::error(e.to_string()),
},
Err(e) => ToolCallResult::error(format!("Invalid parameters: {}", e)),
},
None => ToolCallResult::error("Missing required parameter: name".to_string()),
})
}
_ => None,
}
}
async fn dispatch_with_routing(
&self,
params: &ToolCallParams,
) -> (
ToolCallResult,
bool,
String,
Option<String>,
Option<String>,
String,
) {
let Some(engine) = self.routing_engine.clone() else {
let result = self.legacy_dispatch(params).await;
let (reason, resolved) = if self.proxy_manager.has_tool(¶ms.name) {
let stripped = params
.name
.split_once("__")
.map(|(_, rest)| rest.to_string())
.unwrap_or_else(|| params.name.clone());
("legacy_remote", stripped)
} else {
("legacy_local", params.name.clone())
};
return (result, false, reason.to_string(), None, None, resolved);
};
let decision = engine.decide(¶ms.name);
let reason_label = decision.reason.as_label().to_string();
let reason_detail = decision.reason.detail().map(String::from);
let resolved_name = decision.resolved_name.clone();
let primary = decision.primary.clone();
let result = self
.execute_target(&primary, &decision.resolved_name, params.arguments.clone())
.await;
let upstream_label = match &primary {
RoutingTarget::Remote { prefix, .. } => Some(prefix.clone()),
_ => None,
};
if result.is_error == Some(true)
&& let Some(fallback) = &decision.fallback
{
tracing::warn!(
tool = params.name.as_str(),
primary_target = ?primary,
"primary executor errored; retrying via fallback"
);
let fb_result = self
.execute_target(fallback, &decision.resolved_name, params.arguments.clone())
.await;
let fb_upstream = match fallback {
RoutingTarget::Remote { prefix, .. } => Some(prefix.clone()),
_ => None,
};
return (
fb_result,
true,
reason_label,
reason_detail,
fb_upstream,
resolved_name,
);
}
(
result,
false,
reason_label,
reason_detail,
upstream_label,
resolved_name,
)
}
async fn execute_target(
&self,
target: &RoutingTarget,
unprefixed_name: &str,
arguments: Option<Value>,
) -> ToolCallResult {
match target {
RoutingTarget::Local => self.dispatch_builtin_tool(unprefixed_name, arguments).await,
RoutingTarget::Remote {
prefix,
original_name,
} => self
.proxy_manager
.call_by_prefix(prefix, original_name, arguments)
.await
.unwrap_or_else(|| {
ToolCallResult::error(format!(
"No upstream MCP server connected with prefix '{}'",
prefix
))
}),
RoutingTarget::Reject => ToolCallResult::error(format!(
"Tool '{}' is not available (unknown to both local and remote catalogues)",
unprefixed_name
)),
}
}
pub async fn execute_for_prefetch(
&self,
name: &str,
arguments: Option<Value>,
) -> ToolCallResult {
if !self.builtin_tools_config.is_empty()
&& !self.builtin_tools_config.is_tool_allowed(name)
&& !self.proxy_manager.has_tool(name)
{
return ToolCallResult::error(format!(
"Tool '{name}' is disabled by builtin_tools configuration"
));
}
if Self::is_internal_tool(name) {
return ToolCallResult::error(format!(
"Tool '{name}' is internal — never speculatable"
));
}
let params = ToolCallParams {
name: name.to_string(),
arguments,
};
match self.routing_engine.clone() {
Some(engine) => {
let decision = engine.decide(name);
self.execute_target(&decision.primary, &decision.resolved_name, params.arguments)
.await
}
None => self.legacy_dispatch(¶ms).await,
}
}
fn is_internal_tool(name: &str) -> bool {
matches!(
name,
"use_context" | "list_contexts" | "get_current_context" | "switch_context"
)
}
async fn legacy_dispatch(&self, params: &ToolCallParams) -> ToolCallResult {
if let Some(result) = self
.proxy_manager
.try_call(¶ms.name, params.arguments.clone())
.await
{
return result;
}
self.dispatch_builtin_tool(¶ms.name, params.arguments.clone())
.await
}
async fn dispatch_builtin_tool(&self, name: &str, arguments: Option<Value>) -> ToolCallResult {
let executor = self.create_executor();
let args = arguments.unwrap_or(Value::Null);
let category = devboy_executor::Executor::tool_category(name);
match category {
Some(devboy_core::ToolCategory::MeetingNotes) => {
for provider in &self.meeting_providers {
match executor
.execute_direct_meeting(name, args.clone(), provider.as_ref())
.await
{
Ok(output) => return output_to_result(output),
Err(e) => {
tracing::debug!("Meeting provider failed: {}", e);
continue;
}
}
}
ToolCallResult::error(format!("No meeting provider supports '{}'", name))
}
Some(devboy_core::ToolCategory::Messenger) => {
for provider in &self.active_messenger_providers() {
match executor
.execute_direct_messenger(name, args.clone(), provider.as_ref())
.await
{
Ok(output) => return output_to_result(output),
Err(e) => {
tracing::debug!("Messenger provider failed: {}", e);
continue;
}
}
}
ToolCallResult::error(format!("No messenger provider supports '{}'", name))
}
Some(devboy_core::ToolCategory::KnowledgeBase) => {
for provider in &self.active_knowledge_base_providers() {
match executor
.execute_direct_knowledge_base(name, args.clone(), provider.as_ref())
.await
{
Ok(output) => return output_to_result(output),
Err(e) => {
tracing::debug!("Knowledge base provider failed: {}", e);
continue;
}
}
}
ToolCallResult::error(format!("No knowledge base provider supports '{}'", name))
}
_ => {
let providers = self.active_providers();
if providers.is_empty() {
return ToolCallResult::error("No providers configured".to_string());
}
for provider in &providers {
match executor
.execute_direct(name, args.clone(), provider.as_ref())
.await
{
Ok(output) => return output_to_result(output),
Err(e) if should_try_next_provider(&e) => continue,
Err(e) => return ToolCallResult::error(format!("{e}")),
}
}
ToolCallResult::error(format!("No provider supports '{}'", name))
}
}
}
fn create_executor(&self) -> devboy_executor::Executor {
let mut executor = devboy_executor::Executor::new();
if let Ok(mgr) =
devboy_assets::AssetManager::from_config(devboy_assets::AssetConfig::default())
{
executor = executor.with_asset_manager(mgr);
}
if !self.active_knowledge_base_providers().is_empty() {
executor.add_enricher(Box::new(devboy_confluence::ConfluenceSchemaEnricher::new()));
}
executor
}
fn handle_ping(&self, id: RequestId) -> JsonRpcResponse {
JsonRpcResponse::success(id, serde_json::json!({}))
}
}
fn output_to_result(output: devboy_executor::ToolOutput) -> ToolCallResult {
match devboy_executor::format_output(output, None, None, None) {
Ok(formatted) => ToolCallResult::text(formatted.content),
Err(e) => ToolCallResult::error(format!("Format error: {e}")),
}
}
fn should_try_next_provider(e: &devboy_core::Error) -> bool {
if matches!(
e,
devboy_core::Error::ProviderUnsupported { .. } | devboy_core::Error::ProviderNotFound(_)
) {
return true;
}
if let devboy_core::Error::InvalidData(msg) = e {
let lower = msg.to_ascii_lowercase();
let is_key_prefix_mismatch = (lower.contains("invalid") && lower.contains("key"))
|| lower.contains("unsupported key prefix");
if is_key_prefix_mismatch {
return true;
}
}
false
}
impl Default for McpServer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::{JSONRPC_VERSION, RequestId, ToolCallResult, ToolResultContent};
#[test]
fn should_try_next_provider_retries_unsupported_and_not_found() {
assert!(should_try_next_provider(
&devboy_core::Error::ProviderUnsupported {
provider: "github".into(),
operation: "get_structures".into(),
}
));
assert!(should_try_next_provider(
&devboy_core::Error::ProviderNotFound("jira".into())
));
}
#[test]
fn should_try_next_provider_retries_key_prefix_mismatch_invalid_data() {
assert!(should_try_next_provider(&devboy_core::Error::InvalidData(
"Invalid issue key: gitlab#1".into()
)));
assert!(should_try_next_provider(&devboy_core::Error::InvalidData(
"Invalid PR key: gh#7".into()
)));
assert!(should_try_next_provider(&devboy_core::Error::InvalidData(
"Invalid mr key: pr#4".into()
)));
}
#[test]
fn should_try_next_provider_bubbles_up_real_errors() {
assert!(!should_try_next_provider(&devboy_core::Error::NotFound(
"No workflow runs found for branch 'main'".into()
)));
assert!(!should_try_next_provider(&devboy_core::Error::Http(
"500 Internal Server Error".into()
)));
assert!(!should_try_next_provider(
&devboy_core::Error::Unauthorized("Bad credentials".into())
));
assert!(!should_try_next_provider(&devboy_core::Error::InvalidData(
"invalid get_issues params: expected string, found integer".into()
)));
}
use async_trait::async_trait;
use devboy_core::types::ChatType;
use devboy_core::{
Comment, CreateCommentInput, CreateIssueInput, Discussion, FileDiff, GetChatsParams,
GetMessagesParams, Issue, IssueFilter, IssueProvider, KbPage, KbPageContent, KbSpace,
KnowledgeBaseProvider, ListPagesParams, MergeRequest, MergeRequestProvider, MessageAuthor,
MessengerChat, MessengerMessage, MessengerProvider, MrFilter, SearchKbParams,
SearchMessagesParams, SendMessageParams, UpdateIssueInput, User,
};
struct TestProvider;
#[async_trait]
impl IssueProvider for TestProvider {
async fn get_issues(
&self,
_filter: IssueFilter,
) -> devboy_core::Result<devboy_core::ProviderResult<Issue>> {
Ok(vec![].into())
}
async fn get_issue(&self, _key: &str) -> devboy_core::Result<Issue> {
Err(devboy_core::Error::NotFound("not found".into()))
}
async fn create_issue(&self, _input: CreateIssueInput) -> devboy_core::Result<Issue> {
Err(devboy_core::Error::NotFound("not found".into()))
}
async fn update_issue(
&self,
_key: &str,
_input: UpdateIssueInput,
) -> devboy_core::Result<Issue> {
Err(devboy_core::Error::NotFound("not found".into()))
}
async fn get_comments(
&self,
_issue_key: &str,
) -> devboy_core::Result<devboy_core::ProviderResult<Comment>> {
Ok(vec![].into())
}
async fn add_comment(&self, _issue_key: &str, _body: &str) -> devboy_core::Result<Comment> {
Err(devboy_core::Error::NotFound("not found".into()))
}
fn provider_name(&self) -> &'static str {
"github" }
}
#[async_trait]
impl MergeRequestProvider for TestProvider {
async fn get_merge_requests(
&self,
_filter: MrFilter,
) -> devboy_core::Result<devboy_core::ProviderResult<MergeRequest>> {
Ok(vec![].into())
}
async fn get_merge_request(&self, _key: &str) -> devboy_core::Result<MergeRequest> {
Err(devboy_core::Error::NotFound("not found".into()))
}
async fn get_discussions(
&self,
_mr_key: &str,
) -> devboy_core::Result<devboy_core::ProviderResult<Discussion>> {
Ok(vec![].into())
}
async fn get_diffs(
&self,
_mr_key: &str,
) -> devboy_core::Result<devboy_core::ProviderResult<FileDiff>> {
Ok(vec![].into())
}
async fn add_comment(
&self,
_mr_key: &str,
_input: CreateCommentInput,
) -> devboy_core::Result<Comment> {
Err(devboy_core::Error::NotFound("not found".into()))
}
fn provider_name(&self) -> &'static str {
"github" }
}
#[async_trait]
impl devboy_core::PipelineProvider for TestProvider {
fn provider_name(&self) -> &'static str {
"test"
}
}
#[async_trait]
impl Provider for TestProvider {
async fn get_current_user(&self) -> devboy_core::Result<User> {
Ok(User {
id: "1".to_string(),
username: "test".to_string(),
name: None,
email: None,
avatar_url: None,
})
}
}
struct TestMessengerProvider;
#[async_trait]
impl MessengerProvider for TestMessengerProvider {
fn provider_name(&self) -> &'static str {
"slack"
}
async fn get_chats(
&self,
_params: GetChatsParams,
) -> devboy_core::Result<devboy_core::ProviderResult<MessengerChat>> {
Ok(vec![MessengerChat {
id: "C123".to_string(),
key: "slack:C123".to_string(),
name: "general".to_string(),
chat_type: ChatType::Channel,
source: "slack".to_string(),
member_count: Some(3),
description: None,
is_active: true,
}]
.into())
}
async fn get_messages(
&self,
_params: GetMessagesParams,
) -> devboy_core::Result<devboy_core::ProviderResult<MessengerMessage>> {
Ok(vec![].into())
}
async fn search_messages(
&self,
_params: SearchMessagesParams,
) -> devboy_core::Result<devboy_core::ProviderResult<MessengerMessage>> {
Ok(vec![].into())
}
async fn send_message(
&self,
_params: SendMessageParams,
) -> devboy_core::Result<MessengerMessage> {
Ok(MessengerMessage {
id: "1710000000.000100".to_string(),
chat_id: "C123".to_string(),
text: "test".to_string(),
author: MessageAuthor {
id: "U123".to_string(),
name: "DevBoy".to_string(),
username: Some("devboy".to_string()),
avatar_url: None,
},
source: "slack".to_string(),
timestamp: "1710000000.000100".to_string(),
thread_id: None,
reply_to_id: None,
attachments: vec![],
is_edited: false,
})
}
}
struct TestKnowledgeBaseProvider;
#[async_trait]
impl KnowledgeBaseProvider for TestKnowledgeBaseProvider {
fn provider_name(&self) -> &'static str {
"confluence"
}
async fn get_spaces(&self) -> devboy_core::Result<devboy_core::ProviderResult<KbSpace>> {
Ok(vec![KbSpace {
id: "space-1".to_string(),
key: "ENG".to_string(),
name: "Engineering".to_string(),
space_type: Some("global".to_string()),
status: Some("current".to_string()),
description: Some("Team docs".to_string()),
url: Some("https://wiki.example.com/spaces/ENG".to_string()),
}]
.into())
}
async fn list_pages(
&self,
_params: ListPagesParams,
) -> devboy_core::Result<devboy_core::ProviderResult<KbPage>> {
Ok(vec![KbPage {
id: "42".to_string(),
title: "Architecture".to_string(),
space_key: Some("ENG".to_string()),
url: Some("https://wiki.example.com/pages/42".to_string()),
version: Some(3),
last_modified: None,
author: Some("Alice".to_string()),
excerpt: Some("System architecture".to_string()),
}]
.into())
}
async fn get_page(&self, page_id: &str) -> devboy_core::Result<KbPageContent> {
Ok(KbPageContent {
page: KbPage {
id: page_id.to_string(),
title: "Architecture".to_string(),
space_key: Some("ENG".to_string()),
url: Some(format!("https://wiki.example.com/pages/{page_id}")),
version: Some(3),
last_modified: None,
author: Some("Alice".to_string()),
excerpt: Some("System architecture".to_string()),
},
content: "# Architecture".to_string(),
content_type: "markdown".to_string(),
ancestors: vec![],
labels: vec!["docs".to_string()],
})
}
async fn create_page(
&self,
_params: devboy_core::CreatePageParams,
) -> devboy_core::Result<KbPage> {
Err(devboy_core::Error::ProviderUnsupported {
provider: "confluence".to_string(),
operation: "create_page".to_string(),
})
}
async fn update_page(
&self,
_params: devboy_core::UpdatePageParams,
) -> devboy_core::Result<KbPage> {
Err(devboy_core::Error::ProviderUnsupported {
provider: "confluence".to_string(),
operation: "update_page".to_string(),
})
}
async fn search(
&self,
_params: SearchKbParams,
) -> devboy_core::Result<devboy_core::ProviderResult<KbPage>> {
Ok(vec![KbPage {
id: "42".to_string(),
title: "Architecture".to_string(),
space_key: Some("ENG".to_string()),
url: Some("https://wiki.example.com/pages/42".to_string()),
version: Some(3),
last_modified: None,
author: Some("Alice".to_string()),
excerpt: Some("System architecture".to_string()),
}]
.into())
}
}
#[test]
fn test_server_creation() {
let server = McpServer::new();
assert!(server.providers().is_empty());
assert!(!server.initialized);
}
#[test]
fn test_initialize_response() {
let mut server = McpServer::new();
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "initialize".to_string(),
params: Some(serde_json::json!({
"protocolVersion": "2025-11-25",
"capabilities": {},
"clientInfo": {
"name": "test-client",
"version": "1.0.0"
}
})),
};
let resp = tokio::runtime::Runtime::new()
.unwrap()
.block_on(server.handle_request(req));
assert!(resp.result.is_some());
assert!(resp.error.is_none());
assert!(server.initialized);
}
#[test]
fn test_tools_list_without_providers() {
let server = McpServer::new();
let resp = server.handle_tools_list(RequestId::Number(1));
assert!(resp.result.is_some());
let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
assert!(result.tools.iter().any(|t| t.name == "use_context"));
assert!(result.tools.iter().any(|t| t.name == "get_current_context"));
assert!(!result.tools.iter().any(|t| t.name == "get_issues"));
assert!(!result.tools.iter().any(|t| t.name == "get_merge_requests"));
}
#[test]
fn test_tools_list_with_provider() {
let mut server = McpServer::new();
server.add_provider(Arc::new(TestProvider));
let resp = server.handle_tools_list(RequestId::Number(1));
assert!(resp.result.is_some());
let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert!(!result.tools.is_empty());
assert!(result.tools.iter().any(|t| t.name == "get_issues"));
assert!(result.tools.iter().any(|t| t.name == "get_merge_requests"));
assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
assert!(result.tools.iter().any(|t| t.name == "use_context"));
assert!(result.tools.iter().any(|t| t.name == "get_current_context"));
}
#[test]
fn test_ping() {
let server = McpServer::new();
let resp = server.handle_ping(RequestId::String("ping-1".to_string()));
assert!(resp.result.is_some());
assert!(resp.error.is_none());
}
#[test]
fn test_double_initialize_error() {
let mut server = McpServer::new();
server.initialized = true;
let resp = server.handle_initialize(RequestId::Number(1), None);
assert!(resp.error.is_some());
assert!(resp.result.is_none());
}
#[test]
fn test_unknown_method() {
let mut server = McpServer::new();
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "unknown/method".to_string(),
params: None,
};
let resp = tokio::runtime::Runtime::new()
.unwrap()
.block_on(server.handle_request(req));
assert!(resp.error.is_some());
assert_eq!(resp.error.unwrap().code, JsonRpcError::METHOD_NOT_FOUND);
}
#[test]
fn test_add_provider_and_providers() {
let mut server = McpServer::new();
assert!(server.providers().is_empty());
server.add_provider(Arc::new(TestProvider));
assert_eq!(server.providers().len(), 1);
}
#[test]
fn test_handle_notification_initialized() {
let mut server = McpServer::new();
server.handle_notification("initialized");
}
#[test]
fn test_handle_notification_cancelled() {
let mut server = McpServer::new();
server.handle_notification("notifications/cancelled");
}
#[test]
fn test_handle_notification_unknown() {
let mut server = McpServer::new();
server.handle_notification("some/unknown/notification");
}
#[tokio::test]
async fn test_handle_message_notification() {
let mut server = McpServer::new();
let msg = IncomingMessage::Notification(crate::protocol::JsonRpcNotification {
jsonrpc: JSONRPC_VERSION.to_string(),
method: "initialized".to_string(),
params: None,
});
let response = server.handle_message(msg).await;
assert!(response.is_none());
}
#[tokio::test]
async fn test_handle_message_request() {
let mut server = McpServer::new();
let msg = IncomingMessage::Request(JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "ping".to_string(),
params: None,
});
let response = server.handle_message(msg).await;
assert!(response.is_some());
let resp = response.unwrap();
assert!(resp.result.is_some());
}
#[tokio::test]
async fn test_handle_tools_call() {
let mut server = McpServer::new();
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_issues",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
assert!(resp.result.is_some());
}
#[tokio::test]
async fn test_handle_tools_call_missing_params() {
let mut server = McpServer::new();
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: None,
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_some());
}
#[tokio::test]
async fn test_handle_tools_call_invalid_params() {
let mut server = McpServer::new();
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!("not an object")),
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_some());
}
#[test]
fn test_initialize_without_params() {
let mut server = McpServer::new();
let resp = server.handle_initialize(RequestId::Number(1), None);
assert!(resp.result.is_some());
assert!(resp.error.is_none());
assert!(server.initialized);
}
#[test]
fn test_initialize_with_invalid_params() {
let mut server = McpServer::new();
let resp = server.handle_initialize(
RequestId::Number(1),
Some(serde_json::json!({"invalid": true})),
);
assert!(resp.result.is_some());
assert!(server.initialized);
}
#[test]
fn test_default_trait() {
let server = McpServer::default();
assert!(server.providers().is_empty());
}
#[test]
fn test_context_switch_missing_context() {
let server = McpServer::new();
let err = server.set_active_context("missing").unwrap_err();
assert!(err.to_string().contains("not found"));
}
#[test]
fn test_context_names_and_active_context_switch() {
let server = McpServer::new();
assert_eq!(server.active_context_name(), "default".to_string());
assert_eq!(server.context_names(), vec!["default".to_string()]);
let mut server = server;
server.ensure_context("workspace");
assert_eq!(
server.context_names(),
vec!["default".to_string(), "workspace".to_string()]
);
server.set_active_context("workspace").unwrap();
assert_eq!(server.active_context_name(), "workspace".to_string());
}
#[tokio::test]
async fn test_tools_call_get_current_context() {
let mut server = McpServer::new();
server.contexts.insert("workspace".to_string(), vec![]);
server.set_active_context("workspace").unwrap();
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_current_context",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_none());
let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
let text = match &result.content[0] {
ToolResultContent::Text { text } => text,
};
assert_eq!(text, "workspace");
assert_eq!(result.is_error, None);
}
#[tokio::test]
async fn test_tools_call_list_contexts_marks_active() {
let mut server = McpServer::new();
server.contexts.insert("workspace".to_string(), vec![]);
server.set_active_context("workspace").unwrap();
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(2),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "list_contexts",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_none());
let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
let text = match &result.content[0] {
ToolResultContent::Text { text } => text,
};
assert!(text.contains("* default"));
assert!(text.contains("* workspace (active)"));
}
#[tokio::test]
async fn test_tools_call_use_context_success_and_error_paths() {
let mut server = McpServer::new();
server.contexts.insert("workspace".to_string(), vec![]);
let missing_name_req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(3),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "use_context",
"arguments": {}
})),
};
let missing_name_resp = server.handle_request(missing_name_req).await;
let missing_name_result: ToolCallResult =
serde_json::from_value(missing_name_resp.result.unwrap()).unwrap();
assert_eq!(missing_name_result.is_error, Some(true));
let missing_context_req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(4),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "use_context",
"arguments": { "name": "missing" }
})),
};
let missing_context_resp = server.handle_request(missing_context_req).await;
let missing_context_result: ToolCallResult =
serde_json::from_value(missing_context_resp.result.unwrap()).unwrap();
assert_eq!(missing_context_result.is_error, Some(true));
let success_req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(5),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "use_context",
"arguments": { "name": "workspace" }
})),
};
let success_resp = server.handle_request(success_req).await;
let success_result: ToolCallResult =
serde_json::from_value(success_resp.result.unwrap()).unwrap();
assert_eq!(success_result.is_error, None);
assert_eq!(server.active_context_name(), "workspace".to_string());
}
#[test]
fn test_set_proxy_manager() {
let mut server = McpServer::new();
let proxy_manager = ProxyManager::new();
server.set_proxy_manager(proxy_manager);
}
#[test]
fn test_tools_list_includes_proxy_tools() {
let mut server = McpServer::new();
server.add_provider(Arc::new(TestProvider));
let proxy_manager = ProxyManager::new();
server.set_proxy_manager(proxy_manager);
let resp = server.handle_tools_list(RequestId::Number(1));
let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert!(result.tools.iter().any(|t| t.name == "get_issues"));
assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
assert!(result.tools.iter().any(|t| t.name == "use_context"));
assert!(result.tools.iter().any(|t| t.name == "get_current_context"));
assert!(!result.tools.iter().any(|t| t.name.contains("__")));
}
#[test]
fn test_default_server_has_empty_proxy_manager() {
let server = McpServer::default();
let resp = server.handle_tools_list(RequestId::Number(1));
let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert!(!result.tools.iter().any(|t| t.name.contains("__")));
}
#[test]
fn test_builtin_tools_disabled_filters_tools_list() {
let mut server = McpServer::new();
server.add_provider(Arc::new(TestProvider));
server
.set_builtin_tools_config(BuiltinToolsConfig {
disabled: vec!["get_issues".to_string(), "create_issue".to_string()],
enabled: vec![],
})
.unwrap();
let resp = server.handle_tools_list(RequestId::Number(1));
let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert!(!result.tools.iter().any(|t| t.name == "get_issues"));
assert!(!result.tools.iter().any(|t| t.name == "create_issue"));
assert!(result.tools.iter().any(|t| t.name == "get_merge_requests"));
assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
}
#[test]
fn test_builtin_tools_enabled_whitelist_filters_tools_list() {
let mut server = McpServer::new();
server
.set_builtin_tools_config(BuiltinToolsConfig {
disabled: vec![],
enabled: vec![
"list_contexts".to_string(),
"use_context".to_string(),
"get_current_context".to_string(),
],
})
.unwrap();
let resp = server.handle_tools_list(RequestId::Number(1));
let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert_eq!(result.tools.len(), 3);
assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
assert!(result.tools.iter().any(|t| t.name == "use_context"));
assert!(result.tools.iter().any(|t| t.name == "get_current_context"));
assert!(!result.tools.iter().any(|t| t.name == "get_issues"));
}
#[tokio::test]
async fn test_disabled_tool_call_returns_error() {
let mut server = McpServer::new();
server
.set_builtin_tools_config(BuiltinToolsConfig {
disabled: vec!["get_issues".to_string()],
enabled: vec![],
})
.unwrap();
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_issues",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_some());
let err = resp.error.unwrap();
assert_eq!(err.code, JsonRpcError::METHOD_NOT_FOUND);
assert!(err.message.contains("disabled"));
}
#[tokio::test]
async fn test_disabled_tool_allows_non_disabled() {
let mut server = McpServer::new();
server
.set_builtin_tools_config(BuiltinToolsConfig {
disabled: vec!["get_issues".to_string()],
enabled: vec![],
})
.unwrap();
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_current_context",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_none());
assert!(resp.result.is_some());
}
struct IssueOnlyTestProvider;
#[async_trait]
impl IssueProvider for IssueOnlyTestProvider {
async fn get_issues(
&self,
_filter: IssueFilter,
) -> devboy_core::Result<devboy_core::ProviderResult<Issue>> {
Ok(vec![].into())
}
async fn get_issue(&self, _key: &str) -> devboy_core::Result<Issue> {
Err(devboy_core::Error::NotFound("not found".into()))
}
async fn create_issue(&self, _input: CreateIssueInput) -> devboy_core::Result<Issue> {
Err(devboy_core::Error::NotFound("not found".into()))
}
async fn update_issue(
&self,
_key: &str,
_input: UpdateIssueInput,
) -> devboy_core::Result<Issue> {
Err(devboy_core::Error::NotFound("not found".into()))
}
async fn get_comments(
&self,
_issue_key: &str,
) -> devboy_core::Result<devboy_core::ProviderResult<Comment>> {
Ok(vec![].into())
}
async fn add_comment(&self, _issue_key: &str, _body: &str) -> devboy_core::Result<Comment> {
Err(devboy_core::Error::NotFound("not found".into()))
}
fn provider_name(&self) -> &'static str {
"clickup" }
}
#[async_trait]
impl MergeRequestProvider for IssueOnlyTestProvider {
fn provider_name(&self) -> &'static str {
"clickup"
}
}
#[async_trait]
impl devboy_core::PipelineProvider for IssueOnlyTestProvider {
fn provider_name(&self) -> &'static str {
"test"
}
}
#[async_trait]
impl Provider for IssueOnlyTestProvider {
async fn get_current_user(&self) -> devboy_core::Result<User> {
Ok(User {
id: "1".to_string(),
username: "clickup-user".to_string(),
name: None,
email: None,
avatar_url: None,
})
}
}
#[test]
fn test_issue_only_provider_has_issue_tools_but_no_mr_tools() {
let mut server = McpServer::new();
server.add_provider(Arc::new(IssueOnlyTestProvider));
let resp = server.handle_tools_list(RequestId::Number(1));
let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert!(result.tools.iter().any(|t| t.name == "get_issues"));
assert!(result.tools.iter().any(|t| t.name == "get_issue"));
assert!(result.tools.iter().any(|t| t.name == "create_issue"));
assert!(!result.tools.iter().any(|t| t.name == "get_merge_requests"));
assert!(
!result
.tools
.iter()
.any(|t| t.name == "get_merge_request_discussions")
);
assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
}
#[test]
fn test_add_provider_to_context() {
let mut server = McpServer::new();
server.ensure_context("custom");
server.add_provider_to_context("custom", Arc::new(TestProvider));
assert!(server.providers().is_empty());
server.set_active_context("custom").unwrap();
assert_eq!(server.active_providers().len(), 1);
}
#[test]
fn test_knowledge_base_tools_are_scoped_to_active_context() {
let mut server = McpServer::new();
server.ensure_context("wiki-context");
server.ensure_context("plain-context");
server.add_knowledge_base_provider_to_context(
"wiki-context",
Arc::new(TestKnowledgeBaseProvider),
);
server.set_active_context("plain-context").unwrap();
let plain_result: ToolsListResult = serde_json::from_value(
server
.handle_tools_list(RequestId::Number(1))
.result
.unwrap(),
)
.unwrap();
assert!(
!plain_result
.tools
.iter()
.any(|tool| tool.name == "get_knowledge_base_spaces")
);
server.set_active_context("wiki-context").unwrap();
let wiki_result: ToolsListResult = serde_json::from_value(
server
.handle_tools_list(RequestId::Number(2))
.result
.unwrap(),
)
.unwrap();
assert!(
wiki_result
.tools
.iter()
.any(|tool| tool.name == "get_knowledge_base_spaces")
);
}
#[test]
fn test_add_knowledge_base_provider_creates_context_for_activation() {
let mut server = McpServer::new();
server.add_knowledge_base_provider_to_context(
"wiki-only",
Arc::new(TestKnowledgeBaseProvider),
);
assert!(server.context_names().contains(&"wiki-only".to_string()));
assert!(server.set_active_context("wiki-only").is_ok());
}
#[tokio::test]
async fn test_tools_call_dispatches_knowledge_base_provider() {
let mut server = McpServer::new();
server.add_knowledge_base_provider(Arc::new(TestKnowledgeBaseProvider));
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(6),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_knowledge_base_spaces",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_none());
let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert_eq!(result.is_error, None);
match &result.content[0] {
ToolResultContent::Text { text } => {
assert!(text.contains("Knowledge Base Spaces"));
assert!(text.contains("Engineering"));
}
}
}
#[test]
fn test_create_executor_registers_kb_enricher_for_active_context() {
let mut server = McpServer::new();
server.ensure_context("wiki-context");
server.ensure_context("plain-context");
server.add_knowledge_base_provider_to_context(
"wiki-context",
Arc::new(TestKnowledgeBaseProvider),
);
server.set_active_context("plain-context").unwrap();
let plain_tools = server.create_executor().list_tools();
assert!(
!plain_tools
.iter()
.any(|tool| tool.name == "get_knowledge_base_spaces")
);
server.set_active_context("wiki-context").unwrap();
let wiki_tools = server.create_executor().list_tools();
assert!(
wiki_tools
.iter()
.any(|tool| tool.name == "get_knowledge_base_spaces")
);
}
use crate::protocol::ToolDefinition;
use crate::routing::RoutingEngine;
use crate::signature_match::{MatchReport, ToolMatch};
use devboy_core::config::{ProxyRoutingConfig, RoutingStrategy};
fn match_report_with(items: Vec<ToolMatch>) -> MatchReport {
let mut r = MatchReport::default();
for m in items {
r.matches.insert(m.tool_name.clone(), m);
}
r
}
#[tokio::test]
async fn test_routing_engine_reject_decision_surfaces_as_error_result() {
let mut server = McpServer::new();
let engine = RoutingEngine::new(ProxyRoutingConfig::default(), MatchReport::default());
server.set_routing_engine(Arc::new(engine));
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "mystery_tool",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert_eq!(result.is_error, Some(true));
match &result.content[0] {
ToolResultContent::Text { text } => {
assert!(text.contains("unknown to both local and remote"));
}
}
}
#[tokio::test]
async fn test_routing_engine_local_dispatch_uses_toolhandler() {
let mut server = McpServer::new();
server.add_provider(Arc::new(TestProvider));
let report = match_report_with(vec![ToolMatch {
tool_name: "get_issues".to_string(),
local_present: true,
remote_present: false,
schema_compatible: None,
upstream_prefix: None,
schema_mismatch: None,
}]);
let engine = RoutingEngine::new(ProxyRoutingConfig::default(), report);
server.set_routing_engine(Arc::new(engine));
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_issues",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_none());
let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert!(result.is_error.is_none());
}
#[tokio::test]
async fn test_telemetry_buffer_receives_event_per_call() {
let mut server = McpServer::new();
server.add_provider(Arc::new(TestProvider));
let report = match_report_with(vec![ToolMatch {
tool_name: "get_issues".to_string(),
local_present: true,
remote_present: false,
schema_compatible: None,
upstream_prefix: None,
schema_mismatch: None,
}]);
let engine = RoutingEngine::new(ProxyRoutingConfig::default(), report);
server.set_routing_engine(Arc::new(engine));
let buffer = TelemetryBuffer::new(16);
server.set_telemetry(buffer.clone());
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_issues",
"arguments": {}
})),
};
let _resp = server.handle_request(req).await;
let events = buffer.drain(100).await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].tool, "get_issues");
assert_eq!(events[0].routing_decision, "local_only");
assert_eq!(events[0].status, TelemetryStatus::Success);
}
#[tokio::test]
async fn test_telemetry_event_captures_reason_detail_for_override_rule() {
let mut server = McpServer::new();
server.add_provider(Arc::new(TestProvider));
let report = match_report_with(vec![ToolMatch {
tool_name: "get_issues".to_string(),
local_present: true,
remote_present: true,
schema_compatible: Some(true),
upstream_prefix: Some("cloud".to_string()),
schema_mismatch: None,
}]);
let config = ProxyRoutingConfig {
strategy: RoutingStrategy::Remote,
fallback_on_error: true,
tool_overrides: vec![devboy_core::config::ProxyToolRule {
pattern: "get_*".to_string(),
strategy: RoutingStrategy::Local,
}],
};
let engine = RoutingEngine::new(config, report);
server.set_routing_engine(Arc::new(engine));
let buffer = TelemetryBuffer::new(16);
server.set_telemetry(buffer.clone());
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_issues",
"arguments": {}
})),
};
let _resp = server.handle_request(req).await;
let events = buffer.drain(100).await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].routing_decision, "override_rule");
assert_eq!(events[0].routing_detail.as_deref(), Some("get_*"));
assert!(events[0].upstream.is_none());
}
#[tokio::test]
async fn test_no_routing_engine_keeps_legacy_behaviour() {
let mut server = McpServer::new();
server.add_provider(Arc::new(TestProvider));
let buffer = TelemetryBuffer::new(16);
server.set_telemetry(buffer.clone());
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_issues",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_none());
let events = buffer.drain(100).await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].routing_decision, "legacy_local");
}
#[allow(dead_code)]
fn _unused_cfg_helper_tooldef() -> ToolDefinition {
ToolDefinition {
name: "x".into(),
description: "x".into(),
input_schema: serde_json::json!({}),
category: None,
}
}
#[test]
fn test_messenger_providers_are_scoped_to_active_context() {
let mut server = McpServer::new();
server.ensure_context("slack-context");
server.ensure_context("plain-context");
server.add_messenger_provider_to_context("slack-context", Arc::new(TestMessengerProvider));
server.set_active_context("plain-context").unwrap();
let plain_result: ToolsListResult = serde_json::from_value(
server
.handle_tools_list(RequestId::Number(1))
.result
.unwrap(),
)
.unwrap();
assert!(
!plain_result
.tools
.iter()
.any(|tool| tool.name == "get_messenger_chats")
);
server.set_active_context("slack-context").unwrap();
let slack_result: ToolsListResult = serde_json::from_value(
server
.handle_tools_list(RequestId::Number(2))
.result
.unwrap(),
)
.unwrap();
assert!(
slack_result
.tools
.iter()
.any(|tool| tool.name == "get_messenger_chats")
);
}
#[test]
fn test_add_messenger_provider_creates_context_for_activation() {
let mut server = McpServer::new();
server.add_messenger_provider_to_context("messenger-only", Arc::new(TestMessengerProvider));
assert!(
server
.context_names()
.contains(&"messenger-only".to_string())
);
assert!(server.set_active_context("messenger-only").is_ok());
}
#[tokio::test]
async fn test_deferred_init_resolves_proxy_on_tools_list() {
let mut server = McpServer::new();
server.initialized = true;
let (tx, rx) = oneshot::channel();
server.set_deferred_init(rx);
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let proxy_manager = ProxyManager::new();
let _ = tx.send(DeferredInit {
proxy_manager,
builtin_tools_config: None,
routing_engine: None,
});
});
let resp = server
.handle_request(JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/list".to_string(),
params: None,
})
.await;
assert!(resp.result.is_some());
assert!(server.deferred_init.is_none());
}
#[tokio::test]
async fn test_deferred_init_applies_builtin_tools_config() {
let mut server = McpServer::new();
server.initialized = true;
server.add_provider(Arc::new(TestProvider));
let (tx, rx) = oneshot::channel();
server.set_deferred_init(rx);
let _ = tx.send(DeferredInit {
proxy_manager: ProxyManager::new(),
builtin_tools_config: Some(BuiltinToolsConfig {
disabled: vec!["get_issues".to_string()],
enabled: vec![],
}),
routing_engine: None,
});
let resp = server
.handle_request(JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/list".to_string(),
params: None,
})
.await;
let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert!(!result.tools.iter().any(|t| t.name == "get_issues"));
assert!(result.tools.iter().any(|t| t.name == "get_issue"));
}
#[test]
fn test_enable_layered_pipeline_sets_field() {
use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
let mut server = McpServer::new();
assert!(server.layered_pipeline.is_none());
server.enable_layered_pipeline(SessionPipeline::new(AdaptiveConfig::default()));
assert!(server.layered_pipeline.is_some());
server.on_compaction_boundary();
}
#[tokio::test]
async fn test_compaction_notification_advances_partition() {
use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
let mut server = McpServer::new();
server.enable_layered_pipeline(SessionPipeline::new(AdaptiveConfig::default()));
server.handle_notification("notifications/devboy/compact");
server.handle_notification("notifications/totally/unrelated");
}
#[tokio::test]
async fn test_compact_pipeline_cache_internal_tool() {
use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
let mut server = McpServer::new();
server.enable_layered_pipeline(SessionPipeline::new(AdaptiveConfig::default()));
let req = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "compact_pipeline_cache",
"arguments": {}
})),
};
let resp = server.handle_request(req).await;
assert!(resp.error.is_none());
let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
assert_eq!(result.is_error, None);
}
#[tokio::test]
async fn test_e2e_read_edit_read_busts_cache_via_server() {
use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
let pipeline = SessionPipeline::new(AdaptiveConfig::default());
let body = "x".repeat(600);
let read_params = crate::protocol::ToolCallParams {
name: "Read".to_string(),
arguments: Some(serde_json::json!({"file_path": "/tmp/e2e.rs"})),
};
let r1 = pipeline.process("req_1", &read_params, ToolCallResult::text(body.clone()), 0);
let crate::protocol::ToolResultContent::Text { text: t1 } = &r1.content[0];
assert_eq!(t1, &body);
let edit_params = crate::protocol::ToolCallParams {
name: "Edit".to_string(),
arguments: Some(serde_json::json!({"file_path": "/tmp/e2e.rs"})),
};
if crate::layered::is_mutating_tool(&edit_params.name)
&& let Some(p) = crate::layered::extract_file_path(edit_params.arguments.as_ref())
{
pipeline.invalidate_file(&p);
}
let r3 = pipeline.process(
"req_3",
&read_params,
ToolCallResult::text(body.clone()),
10,
);
let crate::protocol::ToolResultContent::Text { text: t3 } = &r3.content[0];
assert_eq!(
t3, &body,
"Edit must bust the dedup cache so subsequent Read is fresh"
);
}
#[tokio::test]
async fn test_speculate_after_runs_when_enrichment_enabled() {
use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
use std::sync::Arc;
struct StubDispatcher;
#[async_trait::async_trait]
impl crate::speculation::PrefetchDispatcher for StubDispatcher {
async fn dispatch(
&self,
_tool_name: &str,
_args: serde_json::Value,
) -> Result<String, crate::speculation::PrefetchError> {
Ok("prefetched body".to_string())
}
}
let mut cfg = AdaptiveConfig {
tools: devboy_format_pipeline::tool_defaults::default_tool_value_models(),
..AdaptiveConfig::default()
};
cfg.enrichment.enabled = true;
cfg.enrichment.prefetch_timeout_ms = 200;
cfg.enrichment.prefetch_budget_tokens = 4_000;
let pipeline = SessionPipeline::new(cfg)
.with_speculation(Arc::new(StubDispatcher))
.await;
let mut server = McpServer::new();
server.enable_layered_pipeline(pipeline);
let _ = server
.handle_request(JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(1),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "Glob",
"arguments": {"pattern": "src/**/*.rs"}
})),
})
.await;
let snap = server
.layered_pipeline
.as_ref()
.unwrap()
.enrichment_snapshot();
assert!(snap.total_prefetches < 100, "sanity bound");
}
#[tokio::test]
async fn test_fail_fast_short_circuits_dispatch() {
use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
let mut cfg = AdaptiveConfig {
tools: devboy_format_pipeline::tool_defaults::default_tool_value_models(),
..AdaptiveConfig::default()
};
cfg.enrichment.enabled = false;
let pipeline = SessionPipeline::new(cfg);
let empty_params = crate::protocol::ToolCallParams {
name: "ToolSearch".to_string(),
arguments: None,
};
for i in 0..2 {
pipeline.process(
&format!("rid_{i}"),
&empty_params,
ToolCallResult::text(String::new()),
i,
);
}
assert!(
pipeline.should_skip("ToolSearch"),
"circuit must be armed after 2 empty responses"
);
let pre_count = pipeline
.enrichment_snapshot()
.inference_calls_saved_fail_fast;
let mut server = McpServer::new();
server.enable_layered_pipeline(pipeline);
let resp = server
.handle_request(JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(99),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "ToolSearch",
"arguments": {"query": "anything"}
})),
})
.await;
assert!(resp.error.is_none(), "fail-fast must succeed, not error");
let result = resp.result.expect("must carry a result");
let body = result["content"][0]["text"].as_str().expect("text content");
assert!(
body.contains("fail-fast"),
"expected fail-fast hint, got: {body}"
);
let post_count = server
.layered_pipeline
.as_ref()
.unwrap()
.enrichment_snapshot()
.inference_calls_saved_fail_fast;
assert_eq!(
post_count,
pre_count + 1,
"fail-fast must record the saved call"
);
}
#[tokio::test]
async fn test_layered_pipeline_dedups_repeated_internal_tool_response() {
use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
let mut server = McpServer::new();
server.contexts.insert("workspace".to_string(), vec![]);
server.set_active_context("workspace").unwrap();
server.enable_layered_pipeline(SessionPipeline::new(AdaptiveConfig::default()));
let make_req = |id: i64| JsonRpcRequest {
jsonrpc: JSONRPC_VERSION.to_string(),
id: RequestId::Number(id),
method: "tools/call".to_string(),
params: Some(serde_json::json!({
"name": "get_current_context",
"arguments": {}
})),
};
let r1 = server.handle_request(make_req(1)).await;
let r2 = server.handle_request(make_req(2)).await;
assert!(r1.error.is_none());
assert!(r2.error.is_none());
assert_eq!(r1.result, r2.result);
}
}