1use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10use std::time::Instant;
11
12use devboy_core::{BuiltinToolsConfig, Provider};
13use serde::Deserialize;
14use serde_json::Value;
15use tokio::sync::oneshot;
16
17use crate::layered::{SessionPipeline, extract_file_path as file_path_from_args, is_mutating_tool};
18use crate::protocol::{
19 InitializeParams, InitializeResult, JsonRpcError, JsonRpcRequest, JsonRpcResponse, MCP_VERSION,
20 RequestId, ServerCapabilities, ServerInfo, ToolCallParams, ToolCallResult, ToolsCapability,
21 ToolsListResult,
22};
23use crate::proxy::ProxyManager;
24use crate::routing::{RoutingEngine, RoutingTarget};
25use crate::telemetry::{TelemetryBuffer, TelemetryEvent, TelemetryStatus};
26use crate::transport::{IncomingMessage, StdioTransport};
27
28pub struct DeferredInit {
30 pub proxy_manager: ProxyManager,
32 pub builtin_tools_config: Option<BuiltinToolsConfig>,
34 pub routing_engine: Option<Arc<RoutingEngine>>,
37}
38
39pub struct McpServer {
40 contexts: HashMap<String, Vec<Arc<dyn Provider>>>,
41 knowledge_base_contexts: HashMap<String, Vec<Arc<dyn devboy_core::KnowledgeBaseProvider>>>,
42 messenger_contexts: HashMap<String, Vec<Arc<dyn devboy_core::MessengerProvider>>>,
43 active_context: RwLock<String>,
44 initialized: bool,
45 proxy_manager: ProxyManager,
46 builtin_tools_config: BuiltinToolsConfig,
47 meeting_providers: Vec<Arc<dyn devboy_core::MeetingNotesProvider>>,
48 routing_engine: Option<Arc<RoutingEngine>>,
51 telemetry: Option<TelemetryBuffer>,
53 deferred_init: Option<oneshot::Receiver<DeferredInit>>,
56 layered_pipeline: Option<SessionPipeline>,
60}
61
62impl McpServer {
63 pub fn new() -> Self {
65 let mut contexts = HashMap::new();
66 contexts.insert("default".to_string(), Vec::new());
67 let mut knowledge_base_contexts = HashMap::new();
68 knowledge_base_contexts.insert("default".to_string(), Vec::new());
69 let mut messenger_contexts = HashMap::new();
70 messenger_contexts.insert("default".to_string(), Vec::new());
71 Self {
72 contexts,
73 knowledge_base_contexts,
74 messenger_contexts,
75 active_context: RwLock::new("default".to_string()),
76 initialized: false,
77 proxy_manager: ProxyManager::new(),
78 builtin_tools_config: BuiltinToolsConfig::default(),
79 meeting_providers: Vec::new(),
80 routing_engine: None,
81 telemetry: None,
82 deferred_init: None,
83 layered_pipeline: None,
84 }
85 }
86
87 pub fn enable_layered_pipeline(&mut self, pipeline: SessionPipeline) {
91 self.layered_pipeline = Some(pipeline);
92 tracing::info!(
93 "Paper 2 layered pipeline enabled — L0 dedup active. \
94 Edit ~/.devboy/pipeline_config.toml (or set DEVBOY_PIPELINE_CONFIG) \
95 to tune knobs. See `devboy tune analyze` for split-savings metrics."
96 );
97 }
98
99 pub fn on_compaction_boundary(&self) {
101 if let Some(p) = &self.layered_pipeline {
102 p.on_compaction_boundary();
103 }
104 }
105
106 pub fn set_routing_engine(&mut self, engine: Arc<RoutingEngine>) {
109 self.routing_engine = Some(engine);
110 }
111
112 pub fn set_telemetry(&mut self, buffer: TelemetryBuffer) {
114 self.telemetry = Some(buffer);
115 }
116
117 pub fn set_builtin_tools_config(
121 &mut self,
122 config: BuiltinToolsConfig,
123 ) -> devboy_core::Result<()> {
124 config.validate()?;
125 self.builtin_tools_config = config;
126 Ok(())
127 }
128
129 pub fn set_proxy_manager(&mut self, proxy_manager: ProxyManager) {
131 self.proxy_manager = proxy_manager;
132 }
133
134 pub fn set_deferred_init(&mut self, receiver: oneshot::Receiver<DeferredInit>) {
139 self.deferred_init = Some(receiver);
140 }
141
142 async fn resolve_deferred_init(&mut self) {
146 if let Some(receiver) = self.deferred_init.take() {
147 match receiver.await {
148 Ok(init) => {
149 if !init.proxy_manager.is_empty() {
150 self.proxy_manager = init.proxy_manager;
151 }
152 if let Some(bt_config) = init.builtin_tools_config
153 && !bt_config.is_empty()
154 {
155 if let Err(e) = bt_config.validate() {
156 tracing::warn!("Remote builtin_tools config is invalid, ignoring: {e}");
157 } else {
158 self.builtin_tools_config = bt_config;
159 }
160 }
161 if let Some(engine) = init.routing_engine {
162 self.routing_engine = Some(engine);
163 }
164 }
165 Err(_) => {
166 tracing::warn!("Deferred initialization was cancelled");
167 }
168 }
169 }
170 }
171
172 pub fn add_meeting_provider(&mut self, provider: Arc<dyn devboy_core::MeetingNotesProvider>) {
173 self.meeting_providers.push(provider);
174 }
175
176 pub fn add_knowledge_base_provider(
177 &mut self,
178 provider: Arc<dyn devboy_core::KnowledgeBaseProvider>,
179 ) {
180 self.add_knowledge_base_provider_to_context("default", provider);
181 }
182
183 pub fn add_knowledge_base_provider_to_context(
184 &mut self,
185 context: &str,
186 provider: Arc<dyn devboy_core::KnowledgeBaseProvider>,
187 ) {
188 self.contexts.entry(context.to_string()).or_default();
189 self.knowledge_base_contexts
190 .entry(context.to_string())
191 .or_default()
192 .push(provider);
193 }
194
195 pub fn add_messenger_provider(&mut self, provider: Arc<dyn devboy_core::MessengerProvider>) {
196 self.add_messenger_provider_to_context("default", provider);
197 }
198
199 pub fn add_messenger_provider_to_context(
200 &mut self,
201 context: &str,
202 provider: Arc<dyn devboy_core::MessengerProvider>,
203 ) {
204 self.contexts.entry(context.to_string()).or_default();
205 self.messenger_contexts
206 .entry(context.to_string())
207 .or_default()
208 .push(provider);
209 }
210
211 pub fn add_provider(&mut self, provider: Arc<dyn Provider>) {
212 self.contexts
213 .entry("default".to_string())
214 .or_default()
215 .push(provider);
216 }
217
218 pub fn add_provider_to_context(&mut self, context: &str, provider: Arc<dyn Provider>) {
220 self.contexts
221 .entry(context.to_string())
222 .or_default()
223 .push(provider);
224 }
225
226 pub fn ensure_context(&mut self, context: &str) {
228 self.contexts.entry(context.to_string()).or_default();
229 self.knowledge_base_contexts
230 .entry(context.to_string())
231 .or_default();
232 self.messenger_contexts
233 .entry(context.to_string())
234 .or_default();
235 }
236
237 pub fn set_active_context(&self, context: &str) -> devboy_core::Result<()> {
238 if !self.contexts.contains_key(context) {
239 return Err(devboy_core::Error::Config(format!(
240 "Context '{}' not found",
241 context
242 )));
243 }
244
245 let mut active = self
246 .active_context
247 .write()
248 .map_err(|_| devboy_core::Error::Config("Active context lock poisoned".to_string()))?;
249 *active = context.to_string();
250 Ok(())
251 }
252
253 pub fn active_context_name(&self) -> String {
255 self.active_context
256 .read()
257 .map(|g| g.clone())
258 .unwrap_or_else(|_| "default".to_string())
259 }
260
261 pub fn context_names(&self) -> Vec<String> {
263 let mut names: Vec<String> = self.contexts.keys().cloned().collect();
264 names.sort();
265 names
266 }
267
268 pub fn active_providers(&self) -> Vec<Arc<dyn Provider>> {
270 let active = self.active_context_name();
271 self.contexts.get(&active).cloned().unwrap_or_default()
272 }
273
274 pub fn active_knowledge_base_providers(
276 &self,
277 ) -> Vec<Arc<dyn devboy_core::KnowledgeBaseProvider>> {
278 let active = self.active_context_name();
279 self.knowledge_base_contexts
280 .get(&active)
281 .cloned()
282 .unwrap_or_default()
283 }
284
285 pub fn active_messenger_providers(&self) -> Vec<Arc<dyn devboy_core::MessengerProvider>> {
287 let active = self.active_context_name();
288 self.messenger_contexts
289 .get(&active)
290 .cloned()
291 .unwrap_or_default()
292 }
293
294 pub fn providers(&self) -> &[Arc<dyn Provider>] {
296 self.contexts
297 .get("default")
298 .map(Vec::as_slice)
299 .unwrap_or(&[])
300 }
301
302 pub async fn run(&mut self) -> devboy_core::Result<()> {
304 tracing::info!(
305 "Starting MCP server with {} contexts (active: {})",
306 self.contexts.len(),
307 self.active_context_name()
308 );
309
310 let mut transport = StdioTransport::stdio();
311
312 loop {
313 match transport.read_message() {
314 Ok(Some(msg)) => {
315 let response = self.handle_message(msg).await;
316 if let Some(resp) = response
317 && let Err(e) = transport.write_response(&resp)
318 {
319 tracing::error!("Failed to write response: {}", e);
320 break;
321 }
322 }
323 Ok(None) => {
324 tracing::info!("EOF received, shutting down");
325 break;
326 }
327 Err(e) => {
328 tracing::error!("Transport error: {}", e);
329 let error_resp = JsonRpcResponse::error(
331 RequestId::Null,
332 JsonRpcError::parse_error(&e.to_string()),
333 );
334 let _ = transport.write_response(&error_resp);
335 }
336 }
337 }
338
339 tracing::info!("MCP server stopped");
340 Ok(())
341 }
342
343 async fn handle_message(&mut self, msg: IncomingMessage) -> Option<JsonRpcResponse> {
345 match msg {
346 IncomingMessage::Request(req) => Some(self.handle_request(req).await),
347 IncomingMessage::Notification(notif) => {
348 self.handle_notification(¬if.method);
349 None }
351 }
352 }
353
354 pub async fn handle_request(&mut self, req: JsonRpcRequest) -> JsonRpcResponse {
356 tracing::debug!("Handling request: {} (id: {:?})", req.method, req.id);
357
358 match req.method.as_str() {
359 "initialize" => self.handle_initialize(req.id, req.params),
360 "tools/list" => {
361 self.resolve_deferred_init().await;
362 self.handle_tools_list(req.id)
363 }
364 "tools/call" => {
365 self.resolve_deferred_init().await;
366 self.handle_tools_call(req.id, req.params).await
367 }
368 "ping" => self.handle_ping(req.id),
369 method => {
370 tracing::warn!("Unknown method: {}", method);
371 JsonRpcResponse::error(req.id, JsonRpcError::method_not_found(method))
372 }
373 }
374 }
375
376 fn handle_notification(&mut self, method: &str) {
378 match method {
379 "initialized" => {
380 tracing::info!("Client initialized");
381 }
382 "notifications/cancelled" => {
383 tracing::debug!("Request cancelled by client");
384 }
385 "notifications/devboy/compact" => {
391 tracing::info!("Host compaction signal received — advancing dedup partition");
392 self.on_compaction_boundary();
393 }
394 _ => {
395 tracing::debug!("Ignoring notification: {}", method);
396 }
397 }
398 }
399
400 fn handle_initialize(&mut self, id: RequestId, params: Option<Value>) -> JsonRpcResponse {
401 if self.initialized {
402 return JsonRpcResponse::error(
403 id,
404 JsonRpcError::invalid_request("Server already initialized"),
405 );
406 }
407
408 if let Some(params) = params {
410 match serde_json::from_value::<InitializeParams>(params) {
411 Ok(init_params) => {
412 tracing::info!(
413 "Client: {} v{} (protocol: {})",
414 init_params.client_info.name,
415 init_params.client_info.version,
416 init_params.protocol_version
417 );
418 }
419 Err(e) => {
420 tracing::warn!("Failed to parse initialize params: {}", e);
421 }
422 }
423 }
424
425 self.initialized = true;
426
427 let result = InitializeResult {
428 protocol_version: MCP_VERSION.to_string(),
429 capabilities: ServerCapabilities {
430 tools: Some(ToolsCapability {
431 list_changed: false,
432 }),
433 resources: None,
434 prompts: None,
435 },
436 server_info: ServerInfo {
437 name: "devboy-mcp".to_string(),
438 version: env!("CARGO_PKG_VERSION").to_string(),
439 },
440 };
441
442 JsonRpcResponse::success(id, serde_json::to_value(result).unwrap())
443 }
444
445 pub fn handle_tools_list(&self, id: RequestId) -> JsonRpcResponse {
450 let providers = self.active_providers();
451
452 let base_tools = devboy_executor::tools::base_tool_definitions();
454 let mut tools: Vec<crate::protocol::ToolDefinition> = base_tools
455 .into_iter()
456 .map(|t| {
457 let mut schema = serde_json::to_value(&t.input_schema).unwrap_or_default();
458 if let Some(obj) = schema.as_object_mut() {
460 obj.entry("type").or_insert_with(|| "object".into());
461 }
462 crate::protocol::ToolDefinition {
463 name: t.name,
464 description: t.description,
465 input_schema: schema,
466 category: Some(t.category),
467 }
468 })
469 .collect();
470
471 use devboy_core::IssueProvider;
473 let has_issue_providers = !providers.is_empty();
474 let has_mr_providers = providers.iter().any(|p| {
475 matches!(
476 IssueProvider::provider_name(p.as_ref()),
477 "github" | "gitlab"
478 )
479 });
480 let has_jira_provider = providers
484 .iter()
485 .any(|p| IssueProvider::provider_name(p.as_ref()) == "jira");
486 let has_meeting_providers = !self.meeting_providers.is_empty();
487 let has_knowledge_base_providers = !self.active_knowledge_base_providers().is_empty();
488 let has_messenger_providers = !self.active_messenger_providers().is_empty();
489
490 let any_upload = providers
493 .iter()
494 .any(|p| p.asset_capabilities().issue.upload);
495 let any_delete = providers
496 .iter()
497 .any(|p| p.asset_capabilities().issue.delete);
498
499 tools.retain(|t| {
502 match t.name.as_str() {
504 "upload_asset" => return any_upload,
505 "delete_asset" => return any_delete,
506 _ => {}
507 }
508 t.category
509 .map(|cat| match cat {
510 devboy_core::ToolCategory::IssueTracker => has_issue_providers,
511 devboy_core::ToolCategory::Epics => has_issue_providers,
512 devboy_core::ToolCategory::GitRepository => has_mr_providers,
513 devboy_core::ToolCategory::MeetingNotes => has_meeting_providers,
514 devboy_core::ToolCategory::KnowledgeBase => has_knowledge_base_providers,
515 devboy_core::ToolCategory::Messenger => has_messenger_providers,
516 devboy_core::ToolCategory::Releases => has_mr_providers,
517 devboy_core::ToolCategory::JiraStructure => has_jira_provider,
518 })
519 .unwrap_or(true) });
521
522 for tool in devboy_executor::tools::mcp_only_tools() {
526 let mut schema = serde_json::to_value(&tool.input_schema)
530 .expect("McpOnlyTool::input_schema must be serializable");
531 if let Some(obj) = schema.as_object_mut() {
532 obj.entry("type").or_insert_with(|| "object".into());
533 }
534 tools.push(crate::protocol::ToolDefinition {
535 name: tool.name,
536 description: tool.description,
537 input_schema: schema,
538 category: None,
539 });
540 }
541
542 if !self.builtin_tools_config.is_empty() {
544 tools.retain(|t| self.builtin_tools_config.is_tool_allowed(&t.name));
545 }
546
547 tools.extend(self.proxy_manager.all_tools());
549
550 let result = ToolsListResult { tools };
551 JsonRpcResponse::success(id, serde_json::to_value(result).unwrap())
552 }
553
554 async fn handle_tools_call(&mut self, id: RequestId, params: Option<Value>) -> JsonRpcResponse {
555 let params: ToolCallParams = match params {
556 Some(p) => match serde_json::from_value(p) {
557 Ok(params) => params,
558 Err(e) => {
559 return JsonRpcResponse::error(
560 id,
561 JsonRpcError::invalid_params(&e.to_string()),
562 );
563 }
564 },
565 None => {
566 return JsonRpcResponse::error(id, JsonRpcError::invalid_params("Missing params"));
567 }
568 };
569
570 tracing::info!("Calling tool: {}", params.name);
571
572 if !self.builtin_tools_config.is_empty()
574 && !self.builtin_tools_config.is_tool_allowed(¶ms.name)
575 && !self.proxy_manager.has_tool(¶ms.name)
576 {
577 return JsonRpcResponse::error(
578 id,
579 JsonRpcError::method_not_found(&format!(
580 "Tool '{}' is disabled by builtin_tools configuration",
581 params.name
582 )),
583 );
584 }
585
586 if let Some(result) = self.handle_internal_tool(¶ms).await {
588 return JsonRpcResponse::success(id, serde_json::to_value(result).unwrap());
589 }
590
591 if let Some(pipeline) = &self.layered_pipeline
595 && is_mutating_tool(¶ms.name)
596 && let Some(path) = file_path_from_args(params.arguments.as_ref())
597 {
598 pipeline.invalidate_file(&path);
599 }
600
601 if let Some(pipeline) = &self.layered_pipeline
607 && pipeline.should_skip(¶ms.name)
608 {
609 pipeline.record_fail_fast_skip(40);
615 let hint = format!(
616 "> [enrichment: '{}' fail-fast — last calls returned 0 bytes; planner refuses to re-issue. Try a different query.]",
617 params.name
618 );
619 return JsonRpcResponse::success(
620 id,
621 serde_json::to_value(ToolCallResult::text(hint)).unwrap(),
622 );
623 }
624
625 let started = Instant::now();
626 let (result, was_fallback, emitted_reason, emitted_detail, upstream_label, resolved_name) =
627 self.dispatch_with_routing(¶ms).await;
628
629 let result = if let Some(pipeline) = &self.layered_pipeline {
631 let req_id = match &id {
632 RequestId::Number(n) => format!("req_{n}"),
633 RequestId::String(s) => s.clone(),
634 RequestId::Null => "req_null".to_string(),
635 };
636 let ts_ms = std::time::SystemTime::now()
637 .duration_since(std::time::UNIX_EPOCH)
638 .map(|d| d.as_millis() as i64)
639 .unwrap_or(0);
640 pipeline.process(&req_id, ¶ms, result, ts_ms)
641 } else {
642 result
643 };
644
645 let result = if let Some(pipeline) = &self.layered_pipeline
655 && result.is_error != Some(true)
656 {
657 let prev_json = result
658 .content
659 .first()
660 .map(|c| {
661 let crate::protocol::ToolResultContent::Text { text } = c;
662 serde_json::Value::String(text.clone())
663 })
664 .unwrap_or(serde_json::Value::Null);
665 let hint = pipeline.speculate_after(¶ms.name, &prev_json).await;
666 if !hint.is_empty() {
667 let mut new_result = result.clone();
670 if let Some(last) = new_result.content.last_mut() {
671 let crate::protocol::ToolResultContent::Text { text } = last;
672 text.push_str(&hint);
673 }
674 new_result
675 } else {
676 result
677 }
678 } else {
679 result
680 };
681
682 if let Some(buffer) = &self.telemetry {
684 let latency_ms = started.elapsed().as_millis() as u64;
685 let status = if result.is_error == Some(true) {
686 TelemetryStatus::Error
687 } else {
688 TelemetryStatus::Success
689 };
690 let mut event = TelemetryEvent::now(&resolved_name, emitted_reason);
694 event.routing_detail = emitted_detail;
695 event.upstream = upstream_label;
696 event.status = status;
697 event.latency_ms = latency_ms;
698 event.was_fallback = was_fallback;
699 buffer.record(event).await;
700 }
701
702 JsonRpcResponse::success(id, serde_json::to_value(result).unwrap())
703 }
704
705 async fn handle_internal_tool(&self, params: &ToolCallParams) -> Option<ToolCallResult> {
707 match params.name.as_str() {
708 "list_contexts" => {
709 let active = self.active_context_name();
710 let names = self.context_names();
711 let content = names
712 .into_iter()
713 .map(|name| {
714 if name == active {
715 format!("* {} (active)", name)
716 } else {
717 format!("* {}", name)
718 }
719 })
720 .collect::<Vec<_>>()
721 .join("\n");
722 Some(ToolCallResult::text(content))
723 }
724 "get_current_context" => Some(ToolCallResult::text(self.active_context_name())),
725 "compact_pipeline_cache" => {
726 self.on_compaction_boundary();
730 Some(ToolCallResult::text(
731 "pipeline cache partition advanced".to_string(),
732 ))
733 }
734 "use_context" => {
735 #[derive(Deserialize)]
736 struct UseContextParams {
737 name: String,
738 }
739 Some(match ¶ms.arguments {
740 Some(args) => match serde_json::from_value::<UseContextParams>(args.clone()) {
741 Ok(args) => match self.set_active_context(&args.name) {
742 Ok(()) => ToolCallResult::text(format!(
743 "Active context set to '{}'",
744 args.name
745 )),
746 Err(e) => ToolCallResult::error(e.to_string()),
747 },
748 Err(e) => ToolCallResult::error(format!("Invalid parameters: {}", e)),
749 },
750 None => ToolCallResult::error("Missing required parameter: name".to_string()),
751 })
752 }
753 _ => None,
754 }
755 }
756
757 async fn dispatch_with_routing(
763 &self,
764 params: &ToolCallParams,
765 ) -> (
766 ToolCallResult,
767 bool,
768 String,
769 Option<String>,
770 Option<String>,
771 String,
772 ) {
773 let Some(engine) = self.routing_engine.clone() else {
777 let result = self.legacy_dispatch(params).await;
778 let (reason, resolved) = if self.proxy_manager.has_tool(¶ms.name) {
779 let stripped = params
781 .name
782 .split_once("__")
783 .map(|(_, rest)| rest.to_string())
784 .unwrap_or_else(|| params.name.clone());
785 ("legacy_remote", stripped)
786 } else {
787 ("legacy_local", params.name.clone())
788 };
789 return (result, false, reason.to_string(), None, None, resolved);
790 };
791
792 let decision = engine.decide(¶ms.name);
793 let reason_label = decision.reason.as_label().to_string();
794 let reason_detail = decision.reason.detail().map(String::from);
795 let resolved_name = decision.resolved_name.clone();
796
797 let primary = decision.primary.clone();
798 let result = self
799 .execute_target(&primary, &decision.resolved_name, params.arguments.clone())
800 .await;
801
802 let upstream_label = match &primary {
803 RoutingTarget::Remote { prefix, .. } => Some(prefix.clone()),
804 _ => None,
805 };
806
807 if result.is_error == Some(true)
808 && let Some(fallback) = &decision.fallback
809 {
810 tracing::warn!(
811 tool = params.name.as_str(),
812 primary_target = ?primary,
813 "primary executor errored; retrying via fallback"
814 );
815 let fb_result = self
816 .execute_target(fallback, &decision.resolved_name, params.arguments.clone())
817 .await;
818 let fb_upstream = match fallback {
819 RoutingTarget::Remote { prefix, .. } => Some(prefix.clone()),
820 _ => None,
821 };
822 return (
823 fb_result,
824 true,
825 reason_label,
826 reason_detail,
827 fb_upstream,
828 resolved_name,
829 );
830 }
831
832 (
833 result,
834 false,
835 reason_label,
836 reason_detail,
837 upstream_label,
838 resolved_name,
839 )
840 }
841
842 async fn execute_target(
847 &self,
848 target: &RoutingTarget,
849 unprefixed_name: &str,
850 arguments: Option<Value>,
851 ) -> ToolCallResult {
852 match target {
853 RoutingTarget::Local => self.dispatch_builtin_tool(unprefixed_name, arguments).await,
854 RoutingTarget::Remote {
855 prefix,
856 original_name,
857 } => self
858 .proxy_manager
859 .call_by_prefix(prefix, original_name, arguments)
860 .await
861 .unwrap_or_else(|| {
862 ToolCallResult::error(format!(
863 "No upstream MCP server connected with prefix '{}'",
864 prefix
865 ))
866 }),
867 RoutingTarget::Reject => ToolCallResult::error(format!(
868 "Tool '{}' is not available (unknown to both local and remote catalogues)",
869 unprefixed_name
870 )),
871 }
872 }
873
874 pub async fn execute_for_prefetch(
895 &self,
896 name: &str,
897 arguments: Option<Value>,
898 ) -> ToolCallResult {
899 if !self.builtin_tools_config.is_empty()
902 && !self.builtin_tools_config.is_tool_allowed(name)
903 && !self.proxy_manager.has_tool(name)
904 {
905 return ToolCallResult::error(format!(
906 "Tool '{name}' is disabled by builtin_tools configuration"
907 ));
908 }
909
910 if Self::is_internal_tool(name) {
913 return ToolCallResult::error(format!(
914 "Tool '{name}' is internal — never speculatable"
915 ));
916 }
917
918 let params = ToolCallParams {
919 name: name.to_string(),
920 arguments,
921 };
922 match self.routing_engine.clone() {
925 Some(engine) => {
926 let decision = engine.decide(name);
927 self.execute_target(&decision.primary, &decision.resolved_name, params.arguments)
928 .await
929 }
930 None => self.legacy_dispatch(¶ms).await,
931 }
932 }
933
934 fn is_internal_tool(name: &str) -> bool {
937 matches!(
938 name,
939 "use_context" | "list_contexts" | "get_current_context" | "switch_context"
940 )
941 }
942
943 async fn legacy_dispatch(&self, params: &ToolCallParams) -> ToolCallResult {
946 if let Some(result) = self
947 .proxy_manager
948 .try_call(¶ms.name, params.arguments.clone())
949 .await
950 {
951 return result;
952 }
953 self.dispatch_builtin_tool(¶ms.name, params.arguments.clone())
954 .await
955 }
956
957 async fn dispatch_builtin_tool(&self, name: &str, arguments: Option<Value>) -> ToolCallResult {
965 let executor = self.create_executor();
966 let args = arguments.unwrap_or(Value::Null);
967 let category = devboy_executor::Executor::tool_category(name);
968
969 match category {
970 Some(devboy_core::ToolCategory::MeetingNotes) => {
971 for provider in &self.meeting_providers {
972 match executor
973 .execute_direct_meeting(name, args.clone(), provider.as_ref())
974 .await
975 {
976 Ok(output) => return output_to_result(output),
977 Err(e) => {
978 tracing::debug!("Meeting provider failed: {}", e);
979 continue;
980 }
981 }
982 }
983 ToolCallResult::error(format!("No meeting provider supports '{}'", name))
984 }
985 Some(devboy_core::ToolCategory::Messenger) => {
986 for provider in &self.active_messenger_providers() {
987 match executor
988 .execute_direct_messenger(name, args.clone(), provider.as_ref())
989 .await
990 {
991 Ok(output) => return output_to_result(output),
992 Err(e) => {
993 tracing::debug!("Messenger provider failed: {}", e);
994 continue;
995 }
996 }
997 }
998 ToolCallResult::error(format!("No messenger provider supports '{}'", name))
999 }
1000 Some(devboy_core::ToolCategory::KnowledgeBase) => {
1001 for provider in &self.active_knowledge_base_providers() {
1002 match executor
1003 .execute_direct_knowledge_base(name, args.clone(), provider.as_ref())
1004 .await
1005 {
1006 Ok(output) => return output_to_result(output),
1007 Err(e) => {
1008 tracing::debug!("Knowledge base provider failed: {}", e);
1009 continue;
1010 }
1011 }
1012 }
1013 ToolCallResult::error(format!("No knowledge base provider supports '{}'", name))
1014 }
1015 _ => {
1016 let providers = self.active_providers();
1018 if providers.is_empty() {
1019 return ToolCallResult::error("No providers configured".to_string());
1020 }
1021 for provider in &providers {
1022 match executor
1023 .execute_direct(name, args.clone(), provider.as_ref())
1024 .await
1025 {
1026 Ok(output) => return output_to_result(output),
1027 Err(e) if should_try_next_provider(&e) => continue,
1028 Err(e) => return ToolCallResult::error(format!("{e}")),
1029 }
1030 }
1031 ToolCallResult::error(format!("No provider supports '{}'", name))
1032 }
1033 }
1034 }
1035
1036 fn create_executor(&self) -> devboy_executor::Executor {
1038 let mut executor = devboy_executor::Executor::new();
1039 if let Ok(mgr) =
1041 devboy_assets::AssetManager::from_config(devboy_assets::AssetConfig::default())
1042 {
1043 executor = executor.with_asset_manager(mgr);
1044 }
1045 if !self.active_knowledge_base_providers().is_empty() {
1046 executor.add_enricher(Box::new(devboy_confluence::ConfluenceSchemaEnricher::new()));
1047 }
1048 executor
1049 }
1050
1051 fn handle_ping(&self, id: RequestId) -> JsonRpcResponse {
1052 JsonRpcResponse::success(id, serde_json::json!({}))
1053 }
1054}
1055
1056fn output_to_result(output: devboy_executor::ToolOutput) -> ToolCallResult {
1058 match devboy_executor::format_output(output, None, None, None) {
1059 Ok(formatted) => ToolCallResult::text(formatted.content),
1060 Err(e) => ToolCallResult::error(format!("Format error: {e}")),
1061 }
1062}
1063
1064fn should_try_next_provider(e: &devboy_core::Error) -> bool {
1071 if matches!(
1074 e,
1075 devboy_core::Error::ProviderUnsupported { .. } | devboy_core::Error::ProviderNotFound(_)
1076 ) {
1077 return true;
1078 }
1079
1080 if let devboy_core::Error::InvalidData(msg) = e {
1090 let lower = msg.to_ascii_lowercase();
1091 let is_key_prefix_mismatch = (lower.contains("invalid") && lower.contains("key"))
1092 || lower.contains("unsupported key prefix");
1093 if is_key_prefix_mismatch {
1094 return true;
1095 }
1096 }
1097
1098 false
1099}
1100
1101impl Default for McpServer {
1102 fn default() -> Self {
1103 Self::new()
1104 }
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109 use super::*;
1110 use crate::protocol::{JSONRPC_VERSION, RequestId, ToolCallResult, ToolResultContent};
1111
1112 #[test]
1113 fn should_try_next_provider_retries_unsupported_and_not_found() {
1114 assert!(should_try_next_provider(
1115 &devboy_core::Error::ProviderUnsupported {
1116 provider: "github".into(),
1117 operation: "get_structures".into(),
1118 }
1119 ));
1120 assert!(should_try_next_provider(
1121 &devboy_core::Error::ProviderNotFound("jira".into())
1122 ));
1123 }
1124
1125 #[test]
1126 fn should_try_next_provider_retries_key_prefix_mismatch_invalid_data() {
1127 assert!(should_try_next_provider(&devboy_core::Error::InvalidData(
1133 "Invalid issue key: gitlab#1".into()
1134 )));
1135 assert!(should_try_next_provider(&devboy_core::Error::InvalidData(
1136 "Invalid PR key: gh#7".into()
1137 )));
1138 assert!(should_try_next_provider(&devboy_core::Error::InvalidData(
1139 "Invalid mr key: pr#4".into()
1140 )));
1141 }
1142
1143 #[test]
1144 fn should_try_next_provider_bubbles_up_real_errors() {
1145 assert!(!should_try_next_provider(&devboy_core::Error::NotFound(
1149 "No workflow runs found for branch 'main'".into()
1150 )));
1151 assert!(!should_try_next_provider(&devboy_core::Error::Http(
1152 "500 Internal Server Error".into()
1153 )));
1154 assert!(!should_try_next_provider(
1155 &devboy_core::Error::Unauthorized("Bad credentials".into())
1156 ));
1157 assert!(!should_try_next_provider(&devboy_core::Error::InvalidData(
1161 "invalid get_issues params: expected string, found integer".into()
1162 )));
1163 }
1164
1165 use async_trait::async_trait;
1166 use devboy_core::types::ChatType;
1167 use devboy_core::{
1168 Comment, CreateCommentInput, CreateIssueInput, Discussion, FileDiff, GetChatsParams,
1169 GetMessagesParams, Issue, IssueFilter, IssueProvider, KbPage, KbPageContent, KbSpace,
1170 KnowledgeBaseProvider, ListPagesParams, MergeRequest, MergeRequestProvider, MessageAuthor,
1171 MessengerChat, MessengerMessage, MessengerProvider, MrFilter, SearchKbParams,
1172 SearchMessagesParams, SendMessageParams, UpdateIssueInput, User,
1173 };
1174
1175 struct TestProvider;
1177
1178 #[async_trait]
1179 impl IssueProvider for TestProvider {
1180 async fn get_issues(
1181 &self,
1182 _filter: IssueFilter,
1183 ) -> devboy_core::Result<devboy_core::ProviderResult<Issue>> {
1184 Ok(vec![].into())
1185 }
1186 async fn get_issue(&self, _key: &str) -> devboy_core::Result<Issue> {
1187 Err(devboy_core::Error::NotFound("not found".into()))
1188 }
1189 async fn create_issue(&self, _input: CreateIssueInput) -> devboy_core::Result<Issue> {
1190 Err(devboy_core::Error::NotFound("not found".into()))
1191 }
1192 async fn update_issue(
1193 &self,
1194 _key: &str,
1195 _input: UpdateIssueInput,
1196 ) -> devboy_core::Result<Issue> {
1197 Err(devboy_core::Error::NotFound("not found".into()))
1198 }
1199 async fn get_comments(
1200 &self,
1201 _issue_key: &str,
1202 ) -> devboy_core::Result<devboy_core::ProviderResult<Comment>> {
1203 Ok(vec![].into())
1204 }
1205 async fn add_comment(&self, _issue_key: &str, _body: &str) -> devboy_core::Result<Comment> {
1206 Err(devboy_core::Error::NotFound("not found".into()))
1207 }
1208 fn provider_name(&self) -> &'static str {
1209 "github" }
1211 }
1212
1213 #[async_trait]
1214 impl MergeRequestProvider for TestProvider {
1215 async fn get_merge_requests(
1216 &self,
1217 _filter: MrFilter,
1218 ) -> devboy_core::Result<devboy_core::ProviderResult<MergeRequest>> {
1219 Ok(vec![].into())
1220 }
1221 async fn get_merge_request(&self, _key: &str) -> devboy_core::Result<MergeRequest> {
1222 Err(devboy_core::Error::NotFound("not found".into()))
1223 }
1224 async fn get_discussions(
1225 &self,
1226 _mr_key: &str,
1227 ) -> devboy_core::Result<devboy_core::ProviderResult<Discussion>> {
1228 Ok(vec![].into())
1229 }
1230 async fn get_diffs(
1231 &self,
1232 _mr_key: &str,
1233 ) -> devboy_core::Result<devboy_core::ProviderResult<FileDiff>> {
1234 Ok(vec![].into())
1235 }
1236 async fn add_comment(
1237 &self,
1238 _mr_key: &str,
1239 _input: CreateCommentInput,
1240 ) -> devboy_core::Result<Comment> {
1241 Err(devboy_core::Error::NotFound("not found".into()))
1242 }
1243 fn provider_name(&self) -> &'static str {
1244 "github" }
1246 }
1247
1248 #[async_trait]
1249 impl devboy_core::PipelineProvider for TestProvider {
1250 fn provider_name(&self) -> &'static str {
1251 "test"
1252 }
1253 }
1254
1255 #[async_trait]
1256 impl Provider for TestProvider {
1257 async fn get_current_user(&self) -> devboy_core::Result<User> {
1258 Ok(User {
1259 id: "1".to_string(),
1260 username: "test".to_string(),
1261 name: None,
1262 email: None,
1263 avatar_url: None,
1264 })
1265 }
1266 }
1267
1268 struct TestMessengerProvider;
1269
1270 #[async_trait]
1271 impl MessengerProvider for TestMessengerProvider {
1272 fn provider_name(&self) -> &'static str {
1273 "slack"
1274 }
1275
1276 async fn get_chats(
1277 &self,
1278 _params: GetChatsParams,
1279 ) -> devboy_core::Result<devboy_core::ProviderResult<MessengerChat>> {
1280 Ok(vec![MessengerChat {
1281 id: "C123".to_string(),
1282 key: "slack:C123".to_string(),
1283 name: "general".to_string(),
1284 chat_type: ChatType::Channel,
1285 source: "slack".to_string(),
1286 member_count: Some(3),
1287 description: None,
1288 is_active: true,
1289 }]
1290 .into())
1291 }
1292
1293 async fn get_messages(
1294 &self,
1295 _params: GetMessagesParams,
1296 ) -> devboy_core::Result<devboy_core::ProviderResult<MessengerMessage>> {
1297 Ok(vec![].into())
1298 }
1299
1300 async fn search_messages(
1301 &self,
1302 _params: SearchMessagesParams,
1303 ) -> devboy_core::Result<devboy_core::ProviderResult<MessengerMessage>> {
1304 Ok(vec![].into())
1305 }
1306
1307 async fn send_message(
1308 &self,
1309 _params: SendMessageParams,
1310 ) -> devboy_core::Result<MessengerMessage> {
1311 Ok(MessengerMessage {
1312 id: "1710000000.000100".to_string(),
1313 chat_id: "C123".to_string(),
1314 text: "test".to_string(),
1315 author: MessageAuthor {
1316 id: "U123".to_string(),
1317 name: "DevBoy".to_string(),
1318 username: Some("devboy".to_string()),
1319 avatar_url: None,
1320 },
1321 source: "slack".to_string(),
1322 timestamp: "1710000000.000100".to_string(),
1323 thread_id: None,
1324 reply_to_id: None,
1325 attachments: vec![],
1326 is_edited: false,
1327 })
1328 }
1329 }
1330
1331 struct TestKnowledgeBaseProvider;
1332
1333 #[async_trait]
1334 impl KnowledgeBaseProvider for TestKnowledgeBaseProvider {
1335 fn provider_name(&self) -> &'static str {
1336 "confluence"
1337 }
1338
1339 async fn get_spaces(&self) -> devboy_core::Result<devboy_core::ProviderResult<KbSpace>> {
1340 Ok(vec![KbSpace {
1341 id: "space-1".to_string(),
1342 key: "ENG".to_string(),
1343 name: "Engineering".to_string(),
1344 space_type: Some("global".to_string()),
1345 status: Some("current".to_string()),
1346 description: Some("Team docs".to_string()),
1347 url: Some("https://wiki.example.com/spaces/ENG".to_string()),
1348 }]
1349 .into())
1350 }
1351
1352 async fn list_pages(
1353 &self,
1354 _params: ListPagesParams,
1355 ) -> devboy_core::Result<devboy_core::ProviderResult<KbPage>> {
1356 Ok(vec![KbPage {
1357 id: "42".to_string(),
1358 title: "Architecture".to_string(),
1359 space_key: Some("ENG".to_string()),
1360 url: Some("https://wiki.example.com/pages/42".to_string()),
1361 version: Some(3),
1362 last_modified: None,
1363 author: Some("Alice".to_string()),
1364 excerpt: Some("System architecture".to_string()),
1365 }]
1366 .into())
1367 }
1368
1369 async fn get_page(&self, page_id: &str) -> devboy_core::Result<KbPageContent> {
1370 Ok(KbPageContent {
1371 page: KbPage {
1372 id: page_id.to_string(),
1373 title: "Architecture".to_string(),
1374 space_key: Some("ENG".to_string()),
1375 url: Some(format!("https://wiki.example.com/pages/{page_id}")),
1376 version: Some(3),
1377 last_modified: None,
1378 author: Some("Alice".to_string()),
1379 excerpt: Some("System architecture".to_string()),
1380 },
1381 content: "# Architecture".to_string(),
1382 content_type: "markdown".to_string(),
1383 ancestors: vec![],
1384 labels: vec!["docs".to_string()],
1385 })
1386 }
1387
1388 async fn create_page(
1389 &self,
1390 _params: devboy_core::CreatePageParams,
1391 ) -> devboy_core::Result<KbPage> {
1392 Err(devboy_core::Error::ProviderUnsupported {
1393 provider: "confluence".to_string(),
1394 operation: "create_page".to_string(),
1395 })
1396 }
1397
1398 async fn update_page(
1399 &self,
1400 _params: devboy_core::UpdatePageParams,
1401 ) -> devboy_core::Result<KbPage> {
1402 Err(devboy_core::Error::ProviderUnsupported {
1403 provider: "confluence".to_string(),
1404 operation: "update_page".to_string(),
1405 })
1406 }
1407
1408 async fn search(
1409 &self,
1410 _params: SearchKbParams,
1411 ) -> devboy_core::Result<devboy_core::ProviderResult<KbPage>> {
1412 Ok(vec![KbPage {
1413 id: "42".to_string(),
1414 title: "Architecture".to_string(),
1415 space_key: Some("ENG".to_string()),
1416 url: Some("https://wiki.example.com/pages/42".to_string()),
1417 version: Some(3),
1418 last_modified: None,
1419 author: Some("Alice".to_string()),
1420 excerpt: Some("System architecture".to_string()),
1421 }]
1422 .into())
1423 }
1424 }
1425
1426 #[test]
1427 fn test_server_creation() {
1428 let server = McpServer::new();
1429 assert!(server.providers().is_empty());
1430 assert!(!server.initialized);
1431 }
1432
1433 #[test]
1434 fn test_initialize_response() {
1435 let mut server = McpServer::new();
1436
1437 let req = JsonRpcRequest {
1438 jsonrpc: JSONRPC_VERSION.to_string(),
1439 id: RequestId::Number(1),
1440 method: "initialize".to_string(),
1441 params: Some(serde_json::json!({
1442 "protocolVersion": "2025-11-25",
1443 "capabilities": {},
1444 "clientInfo": {
1445 "name": "test-client",
1446 "version": "1.0.0"
1447 }
1448 })),
1449 };
1450
1451 let resp = tokio::runtime::Runtime::new()
1452 .unwrap()
1453 .block_on(server.handle_request(req));
1454
1455 assert!(resp.result.is_some());
1456 assert!(resp.error.is_none());
1457 assert!(server.initialized);
1458 }
1459
1460 #[test]
1461 fn test_tools_list_without_providers() {
1462 let server = McpServer::new();
1464
1465 let resp = server.handle_tools_list(RequestId::Number(1));
1466
1467 assert!(resp.result.is_some());
1468 let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
1469
1470 assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
1472 assert!(result.tools.iter().any(|t| t.name == "use_context"));
1473 assert!(result.tools.iter().any(|t| t.name == "get_current_context"));
1474
1475 assert!(!result.tools.iter().any(|t| t.name == "get_issues"));
1477 assert!(!result.tools.iter().any(|t| t.name == "get_merge_requests"));
1478 }
1479
1480 #[test]
1481 fn test_tools_list_with_provider() {
1482 let mut server = McpServer::new();
1483 server.add_provider(Arc::new(TestProvider));
1484
1485 let resp = server.handle_tools_list(RequestId::Number(1));
1486
1487 assert!(resp.result.is_some());
1488 let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
1489 assert!(!result.tools.is_empty());
1490
1491 assert!(result.tools.iter().any(|t| t.name == "get_issues"));
1493 assert!(result.tools.iter().any(|t| t.name == "get_merge_requests"));
1494 assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
1495 assert!(result.tools.iter().any(|t| t.name == "use_context"));
1496 assert!(result.tools.iter().any(|t| t.name == "get_current_context"));
1497 }
1498
1499 #[test]
1500 fn test_ping() {
1501 let server = McpServer::new();
1502 let resp = server.handle_ping(RequestId::String("ping-1".to_string()));
1503
1504 assert!(resp.result.is_some());
1505 assert!(resp.error.is_none());
1506 }
1507
1508 #[test]
1509 fn test_double_initialize_error() {
1510 let mut server = McpServer::new();
1511 server.initialized = true;
1512
1513 let resp = server.handle_initialize(RequestId::Number(1), None);
1514
1515 assert!(resp.error.is_some());
1516 assert!(resp.result.is_none());
1517 }
1518
1519 #[test]
1520 fn test_unknown_method() {
1521 let mut server = McpServer::new();
1522
1523 let req = JsonRpcRequest {
1524 jsonrpc: JSONRPC_VERSION.to_string(),
1525 id: RequestId::Number(1),
1526 method: "unknown/method".to_string(),
1527 params: None,
1528 };
1529
1530 let resp = tokio::runtime::Runtime::new()
1531 .unwrap()
1532 .block_on(server.handle_request(req));
1533
1534 assert!(resp.error.is_some());
1535 assert_eq!(resp.error.unwrap().code, JsonRpcError::METHOD_NOT_FOUND);
1536 }
1537
1538 #[test]
1539 fn test_add_provider_and_providers() {
1540 let mut server = McpServer::new();
1541 assert!(server.providers().is_empty());
1542
1543 server.add_provider(Arc::new(TestProvider));
1544 assert_eq!(server.providers().len(), 1);
1545 }
1546
1547 #[test]
1548 fn test_handle_notification_initialized() {
1549 let mut server = McpServer::new();
1550 server.handle_notification("initialized");
1552 }
1553
1554 #[test]
1555 fn test_handle_notification_cancelled() {
1556 let mut server = McpServer::new();
1557 server.handle_notification("notifications/cancelled");
1559 }
1560
1561 #[test]
1562 fn test_handle_notification_unknown() {
1563 let mut server = McpServer::new();
1564 server.handle_notification("some/unknown/notification");
1566 }
1567
1568 #[tokio::test]
1569 async fn test_handle_message_notification() {
1570 let mut server = McpServer::new();
1571
1572 let msg = IncomingMessage::Notification(crate::protocol::JsonRpcNotification {
1573 jsonrpc: JSONRPC_VERSION.to_string(),
1574 method: "initialized".to_string(),
1575 params: None,
1576 });
1577
1578 let response = server.handle_message(msg).await;
1579 assert!(response.is_none());
1581 }
1582
1583 #[tokio::test]
1584 async fn test_handle_message_request() {
1585 let mut server = McpServer::new();
1586
1587 let msg = IncomingMessage::Request(JsonRpcRequest {
1588 jsonrpc: JSONRPC_VERSION.to_string(),
1589 id: RequestId::Number(1),
1590 method: "ping".to_string(),
1591 params: None,
1592 });
1593
1594 let response = server.handle_message(msg).await;
1595 assert!(response.is_some());
1597 let resp = response.unwrap();
1598 assert!(resp.result.is_some());
1599 }
1600
1601 #[tokio::test]
1602 async fn test_handle_tools_call() {
1603 let mut server = McpServer::new();
1604
1605 let req = JsonRpcRequest {
1606 jsonrpc: JSONRPC_VERSION.to_string(),
1607 id: RequestId::Number(1),
1608 method: "tools/call".to_string(),
1609 params: Some(serde_json::json!({
1610 "name": "get_issues",
1611 "arguments": {}
1612 })),
1613 };
1614
1615 let resp = server.handle_request(req).await;
1616 assert!(resp.result.is_some());
1618 }
1619
1620 #[tokio::test]
1621 async fn test_handle_tools_call_missing_params() {
1622 let mut server = McpServer::new();
1623
1624 let req = JsonRpcRequest {
1625 jsonrpc: JSONRPC_VERSION.to_string(),
1626 id: RequestId::Number(1),
1627 method: "tools/call".to_string(),
1628 params: None,
1629 };
1630
1631 let resp = server.handle_request(req).await;
1632 assert!(resp.error.is_some());
1633 }
1634
1635 #[tokio::test]
1636 async fn test_handle_tools_call_invalid_params() {
1637 let mut server = McpServer::new();
1638
1639 let req = JsonRpcRequest {
1640 jsonrpc: JSONRPC_VERSION.to_string(),
1641 id: RequestId::Number(1),
1642 method: "tools/call".to_string(),
1643 params: Some(serde_json::json!("not an object")),
1644 };
1645
1646 let resp = server.handle_request(req).await;
1647 assert!(resp.error.is_some());
1648 }
1649
1650 #[test]
1651 fn test_initialize_without_params() {
1652 let mut server = McpServer::new();
1653
1654 let resp = server.handle_initialize(RequestId::Number(1), None);
1655
1656 assert!(resp.result.is_some());
1657 assert!(resp.error.is_none());
1658 assert!(server.initialized);
1659 }
1660
1661 #[test]
1662 fn test_initialize_with_invalid_params() {
1663 let mut server = McpServer::new();
1664
1665 let resp = server.handle_initialize(
1667 RequestId::Number(1),
1668 Some(serde_json::json!({"invalid": true})),
1669 );
1670
1671 assert!(resp.result.is_some());
1672 assert!(server.initialized);
1673 }
1674
1675 #[test]
1676 fn test_default_trait() {
1677 let server = McpServer::default();
1678 assert!(server.providers().is_empty());
1679 }
1680
1681 #[test]
1682 fn test_context_switch_missing_context() {
1683 let server = McpServer::new();
1684 let err = server.set_active_context("missing").unwrap_err();
1685 assert!(err.to_string().contains("not found"));
1686 }
1687
1688 #[test]
1689 fn test_context_names_and_active_context_switch() {
1690 let server = McpServer::new();
1691 assert_eq!(server.active_context_name(), "default".to_string());
1692 assert_eq!(server.context_names(), vec!["default".to_string()]);
1693
1694 let mut server = server;
1695 server.ensure_context("workspace");
1696
1697 assert_eq!(
1698 server.context_names(),
1699 vec!["default".to_string(), "workspace".to_string()]
1700 );
1701
1702 server.set_active_context("workspace").unwrap();
1703 assert_eq!(server.active_context_name(), "workspace".to_string());
1704 }
1705
1706 #[tokio::test]
1707 async fn test_tools_call_get_current_context() {
1708 let mut server = McpServer::new();
1709 server.contexts.insert("workspace".to_string(), vec![]);
1710 server.set_active_context("workspace").unwrap();
1711
1712 let req = JsonRpcRequest {
1713 jsonrpc: JSONRPC_VERSION.to_string(),
1714 id: RequestId::Number(1),
1715 method: "tools/call".to_string(),
1716 params: Some(serde_json::json!({
1717 "name": "get_current_context",
1718 "arguments": {}
1719 })),
1720 };
1721
1722 let resp = server.handle_request(req).await;
1723 assert!(resp.error.is_none());
1724 let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
1725 let text = match &result.content[0] {
1726 ToolResultContent::Text { text } => text,
1727 };
1728 assert_eq!(text, "workspace");
1729 assert_eq!(result.is_error, None);
1730 }
1731
1732 #[tokio::test]
1733 async fn test_tools_call_list_contexts_marks_active() {
1734 let mut server = McpServer::new();
1735 server.contexts.insert("workspace".to_string(), vec![]);
1736 server.set_active_context("workspace").unwrap();
1737
1738 let req = JsonRpcRequest {
1739 jsonrpc: JSONRPC_VERSION.to_string(),
1740 id: RequestId::Number(2),
1741 method: "tools/call".to_string(),
1742 params: Some(serde_json::json!({
1743 "name": "list_contexts",
1744 "arguments": {}
1745 })),
1746 };
1747
1748 let resp = server.handle_request(req).await;
1749 assert!(resp.error.is_none());
1750 let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
1751 let text = match &result.content[0] {
1752 ToolResultContent::Text { text } => text,
1753 };
1754 assert!(text.contains("* default"));
1755 assert!(text.contains("* workspace (active)"));
1756 }
1757
1758 #[tokio::test]
1759 async fn test_tools_call_use_context_success_and_error_paths() {
1760 let mut server = McpServer::new();
1761 server.contexts.insert("workspace".to_string(), vec![]);
1762
1763 let missing_name_req = JsonRpcRequest {
1764 jsonrpc: JSONRPC_VERSION.to_string(),
1765 id: RequestId::Number(3),
1766 method: "tools/call".to_string(),
1767 params: Some(serde_json::json!({
1768 "name": "use_context",
1769 "arguments": {}
1770 })),
1771 };
1772 let missing_name_resp = server.handle_request(missing_name_req).await;
1773 let missing_name_result: ToolCallResult =
1774 serde_json::from_value(missing_name_resp.result.unwrap()).unwrap();
1775 assert_eq!(missing_name_result.is_error, Some(true));
1776
1777 let missing_context_req = JsonRpcRequest {
1778 jsonrpc: JSONRPC_VERSION.to_string(),
1779 id: RequestId::Number(4),
1780 method: "tools/call".to_string(),
1781 params: Some(serde_json::json!({
1782 "name": "use_context",
1783 "arguments": { "name": "missing" }
1784 })),
1785 };
1786 let missing_context_resp = server.handle_request(missing_context_req).await;
1787 let missing_context_result: ToolCallResult =
1788 serde_json::from_value(missing_context_resp.result.unwrap()).unwrap();
1789 assert_eq!(missing_context_result.is_error, Some(true));
1790
1791 let success_req = JsonRpcRequest {
1792 jsonrpc: JSONRPC_VERSION.to_string(),
1793 id: RequestId::Number(5),
1794 method: "tools/call".to_string(),
1795 params: Some(serde_json::json!({
1796 "name": "use_context",
1797 "arguments": { "name": "workspace" }
1798 })),
1799 };
1800 let success_resp = server.handle_request(success_req).await;
1801 let success_result: ToolCallResult =
1802 serde_json::from_value(success_resp.result.unwrap()).unwrap();
1803 assert_eq!(success_result.is_error, None);
1804 assert_eq!(server.active_context_name(), "workspace".to_string());
1805 }
1806
1807 #[test]
1808 fn test_set_proxy_manager() {
1809 let mut server = McpServer::new();
1810 let proxy_manager = ProxyManager::new();
1811 server.set_proxy_manager(proxy_manager);
1812 }
1814
1815 #[test]
1816 fn test_tools_list_includes_proxy_tools() {
1817 let mut server = McpServer::new();
1818 server.add_provider(Arc::new(TestProvider));
1820
1821 let proxy_manager = ProxyManager::new();
1826 server.set_proxy_manager(proxy_manager);
1827
1828 let resp = server.handle_tools_list(RequestId::Number(1));
1829 let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
1830
1831 assert!(result.tools.iter().any(|t| t.name == "get_issues"));
1833 assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
1834 assert!(result.tools.iter().any(|t| t.name == "use_context"));
1835 assert!(result.tools.iter().any(|t| t.name == "get_current_context"));
1836 assert!(!result.tools.iter().any(|t| t.name.contains("__")));
1838 }
1839
1840 #[test]
1841 fn test_default_server_has_empty_proxy_manager() {
1842 let server = McpServer::default();
1843 let resp = server.handle_tools_list(RequestId::Number(1));
1845 let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
1846 assert!(!result.tools.iter().any(|t| t.name.contains("__")));
1847 }
1848
1849 #[test]
1850 fn test_builtin_tools_disabled_filters_tools_list() {
1851 let mut server = McpServer::new();
1852 server.add_provider(Arc::new(TestProvider));
1854 server
1855 .set_builtin_tools_config(BuiltinToolsConfig {
1856 disabled: vec!["get_issues".to_string(), "create_issue".to_string()],
1857 enabled: vec![],
1858 })
1859 .unwrap();
1860
1861 let resp = server.handle_tools_list(RequestId::Number(1));
1862 let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
1863
1864 assert!(!result.tools.iter().any(|t| t.name == "get_issues"));
1865 assert!(!result.tools.iter().any(|t| t.name == "create_issue"));
1866 assert!(result.tools.iter().any(|t| t.name == "get_merge_requests"));
1868 assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
1869 }
1870
1871 #[test]
1872 fn test_builtin_tools_enabled_whitelist_filters_tools_list() {
1873 let mut server = McpServer::new();
1874 server
1875 .set_builtin_tools_config(BuiltinToolsConfig {
1876 disabled: vec![],
1877 enabled: vec![
1878 "list_contexts".to_string(),
1879 "use_context".to_string(),
1880 "get_current_context".to_string(),
1881 ],
1882 })
1883 .unwrap();
1884
1885 let resp = server.handle_tools_list(RequestId::Number(1));
1886 let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
1887
1888 assert_eq!(result.tools.len(), 3);
1889 assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
1890 assert!(result.tools.iter().any(|t| t.name == "use_context"));
1891 assert!(result.tools.iter().any(|t| t.name == "get_current_context"));
1892 assert!(!result.tools.iter().any(|t| t.name == "get_issues"));
1893 }
1894
1895 #[tokio::test]
1896 async fn test_disabled_tool_call_returns_error() {
1897 let mut server = McpServer::new();
1898 server
1899 .set_builtin_tools_config(BuiltinToolsConfig {
1900 disabled: vec!["get_issues".to_string()],
1901 enabled: vec![],
1902 })
1903 .unwrap();
1904
1905 let req = JsonRpcRequest {
1906 jsonrpc: JSONRPC_VERSION.to_string(),
1907 id: RequestId::Number(1),
1908 method: "tools/call".to_string(),
1909 params: Some(serde_json::json!({
1910 "name": "get_issues",
1911 "arguments": {}
1912 })),
1913 };
1914
1915 let resp = server.handle_request(req).await;
1916 assert!(resp.error.is_some());
1917 let err = resp.error.unwrap();
1918 assert_eq!(err.code, JsonRpcError::METHOD_NOT_FOUND);
1919 assert!(err.message.contains("disabled"));
1920 }
1921
1922 #[tokio::test]
1923 async fn test_disabled_tool_allows_non_disabled() {
1924 let mut server = McpServer::new();
1925 server
1926 .set_builtin_tools_config(BuiltinToolsConfig {
1927 disabled: vec!["get_issues".to_string()],
1928 enabled: vec![],
1929 })
1930 .unwrap();
1931
1932 let req = JsonRpcRequest {
1933 jsonrpc: JSONRPC_VERSION.to_string(),
1934 id: RequestId::Number(1),
1935 method: "tools/call".to_string(),
1936 params: Some(serde_json::json!({
1937 "name": "get_current_context",
1938 "arguments": {}
1939 })),
1940 };
1941
1942 let resp = server.handle_request(req).await;
1943 assert!(resp.error.is_none());
1944 assert!(resp.result.is_some());
1945 }
1946
1947 struct IssueOnlyTestProvider;
1949
1950 #[async_trait]
1951 impl IssueProvider for IssueOnlyTestProvider {
1952 async fn get_issues(
1953 &self,
1954 _filter: IssueFilter,
1955 ) -> devboy_core::Result<devboy_core::ProviderResult<Issue>> {
1956 Ok(vec![].into())
1957 }
1958 async fn get_issue(&self, _key: &str) -> devboy_core::Result<Issue> {
1959 Err(devboy_core::Error::NotFound("not found".into()))
1960 }
1961 async fn create_issue(&self, _input: CreateIssueInput) -> devboy_core::Result<Issue> {
1962 Err(devboy_core::Error::NotFound("not found".into()))
1963 }
1964 async fn update_issue(
1965 &self,
1966 _key: &str,
1967 _input: UpdateIssueInput,
1968 ) -> devboy_core::Result<Issue> {
1969 Err(devboy_core::Error::NotFound("not found".into()))
1970 }
1971 async fn get_comments(
1972 &self,
1973 _issue_key: &str,
1974 ) -> devboy_core::Result<devboy_core::ProviderResult<Comment>> {
1975 Ok(vec![].into())
1976 }
1977 async fn add_comment(&self, _issue_key: &str, _body: &str) -> devboy_core::Result<Comment> {
1978 Err(devboy_core::Error::NotFound("not found".into()))
1979 }
1980 fn provider_name(&self) -> &'static str {
1981 "clickup" }
1983 }
1984
1985 #[async_trait]
1986 impl MergeRequestProvider for IssueOnlyTestProvider {
1987 fn provider_name(&self) -> &'static str {
1988 "clickup"
1989 }
1990 }
1992
1993 #[async_trait]
1994 impl devboy_core::PipelineProvider for IssueOnlyTestProvider {
1995 fn provider_name(&self) -> &'static str {
1996 "test"
1997 }
1998 }
1999
2000 #[async_trait]
2001 impl Provider for IssueOnlyTestProvider {
2002 async fn get_current_user(&self) -> devboy_core::Result<User> {
2003 Ok(User {
2004 id: "1".to_string(),
2005 username: "clickup-user".to_string(),
2006 name: None,
2007 email: None,
2008 avatar_url: None,
2009 })
2010 }
2011 }
2012
2013 #[test]
2014 fn test_issue_only_provider_has_issue_tools_but_no_mr_tools() {
2015 let mut server = McpServer::new();
2016 server.add_provider(Arc::new(IssueOnlyTestProvider));
2017
2018 let resp = server.handle_tools_list(RequestId::Number(1));
2019 let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
2020
2021 assert!(result.tools.iter().any(|t| t.name == "get_issues"));
2023 assert!(result.tools.iter().any(|t| t.name == "get_issue"));
2024 assert!(result.tools.iter().any(|t| t.name == "create_issue"));
2025
2026 assert!(!result.tools.iter().any(|t| t.name == "get_merge_requests"));
2028 assert!(
2029 !result
2030 .tools
2031 .iter()
2032 .any(|t| t.name == "get_merge_request_discussions")
2033 );
2034
2035 assert!(result.tools.iter().any(|t| t.name == "list_contexts"));
2037 }
2038
2039 #[test]
2040 fn test_add_provider_to_context() {
2041 let mut server = McpServer::new();
2042 server.ensure_context("custom");
2043 server.add_provider_to_context("custom", Arc::new(TestProvider));
2044
2045 assert!(server.providers().is_empty());
2047
2048 server.set_active_context("custom").unwrap();
2050 assert_eq!(server.active_providers().len(), 1);
2051 }
2052
2053 #[test]
2054 fn test_knowledge_base_tools_are_scoped_to_active_context() {
2055 let mut server = McpServer::new();
2056 server.ensure_context("wiki-context");
2057 server.ensure_context("plain-context");
2058 server.add_knowledge_base_provider_to_context(
2059 "wiki-context",
2060 Arc::new(TestKnowledgeBaseProvider),
2061 );
2062
2063 server.set_active_context("plain-context").unwrap();
2064 let plain_result: ToolsListResult = serde_json::from_value(
2065 server
2066 .handle_tools_list(RequestId::Number(1))
2067 .result
2068 .unwrap(),
2069 )
2070 .unwrap();
2071 assert!(
2072 !plain_result
2073 .tools
2074 .iter()
2075 .any(|tool| tool.name == "get_knowledge_base_spaces")
2076 );
2077
2078 server.set_active_context("wiki-context").unwrap();
2079 let wiki_result: ToolsListResult = serde_json::from_value(
2080 server
2081 .handle_tools_list(RequestId::Number(2))
2082 .result
2083 .unwrap(),
2084 )
2085 .unwrap();
2086 assert!(
2087 wiki_result
2088 .tools
2089 .iter()
2090 .any(|tool| tool.name == "get_knowledge_base_spaces")
2091 );
2092 }
2093
2094 #[test]
2095 fn test_add_knowledge_base_provider_creates_context_for_activation() {
2096 let mut server = McpServer::new();
2097 server.add_knowledge_base_provider_to_context(
2098 "wiki-only",
2099 Arc::new(TestKnowledgeBaseProvider),
2100 );
2101
2102 assert!(server.context_names().contains(&"wiki-only".to_string()));
2103 assert!(server.set_active_context("wiki-only").is_ok());
2104 }
2105
2106 #[tokio::test]
2107 async fn test_tools_call_dispatches_knowledge_base_provider() {
2108 let mut server = McpServer::new();
2109 server.add_knowledge_base_provider(Arc::new(TestKnowledgeBaseProvider));
2110
2111 let req = JsonRpcRequest {
2112 jsonrpc: JSONRPC_VERSION.to_string(),
2113 id: RequestId::Number(6),
2114 method: "tools/call".to_string(),
2115 params: Some(serde_json::json!({
2116 "name": "get_knowledge_base_spaces",
2117 "arguments": {}
2118 })),
2119 };
2120
2121 let resp = server.handle_request(req).await;
2122 assert!(resp.error.is_none());
2123 let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
2124 assert_eq!(result.is_error, None);
2125 match &result.content[0] {
2126 ToolResultContent::Text { text } => {
2127 assert!(text.contains("Knowledge Base Spaces"));
2128 assert!(text.contains("Engineering"));
2129 }
2130 }
2131 }
2132
2133 #[test]
2134 fn test_create_executor_registers_kb_enricher_for_active_context() {
2135 let mut server = McpServer::new();
2136 server.ensure_context("wiki-context");
2137 server.ensure_context("plain-context");
2138 server.add_knowledge_base_provider_to_context(
2139 "wiki-context",
2140 Arc::new(TestKnowledgeBaseProvider),
2141 );
2142
2143 server.set_active_context("plain-context").unwrap();
2144 let plain_tools = server.create_executor().list_tools();
2145 assert!(
2146 !plain_tools
2147 .iter()
2148 .any(|tool| tool.name == "get_knowledge_base_spaces")
2149 );
2150
2151 server.set_active_context("wiki-context").unwrap();
2152 let wiki_tools = server.create_executor().list_tools();
2153 assert!(
2154 wiki_tools
2155 .iter()
2156 .any(|tool| tool.name == "get_knowledge_base_spaces")
2157 );
2158 }
2159
2160 use crate::protocol::ToolDefinition;
2165 use crate::routing::RoutingEngine;
2166 use crate::signature_match::{MatchReport, ToolMatch};
2167 use devboy_core::config::{ProxyRoutingConfig, RoutingStrategy};
2168
2169 fn match_report_with(items: Vec<ToolMatch>) -> MatchReport {
2170 let mut r = MatchReport::default();
2171 for m in items {
2172 r.matches.insert(m.tool_name.clone(), m);
2173 }
2174 r
2175 }
2176
2177 #[tokio::test]
2178 async fn test_routing_engine_reject_decision_surfaces_as_error_result() {
2179 let mut server = McpServer::new();
2180 let engine = RoutingEngine::new(ProxyRoutingConfig::default(), MatchReport::default());
2182 server.set_routing_engine(Arc::new(engine));
2183
2184 let req = JsonRpcRequest {
2185 jsonrpc: JSONRPC_VERSION.to_string(),
2186 id: RequestId::Number(1),
2187 method: "tools/call".to_string(),
2188 params: Some(serde_json::json!({
2189 "name": "mystery_tool",
2190 "arguments": {}
2191 })),
2192 };
2193 let resp = server.handle_request(req).await;
2194 let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
2195 assert_eq!(result.is_error, Some(true));
2196 match &result.content[0] {
2197 ToolResultContent::Text { text } => {
2198 assert!(text.contains("unknown to both local and remote"));
2199 }
2200 }
2201 }
2202
2203 #[tokio::test]
2204 async fn test_routing_engine_local_dispatch_uses_toolhandler() {
2205 let mut server = McpServer::new();
2206 server.add_provider(Arc::new(TestProvider));
2207
2208 let report = match_report_with(vec![ToolMatch {
2209 tool_name: "get_issues".to_string(),
2210 local_present: true,
2211 remote_present: false,
2212 schema_compatible: None,
2213 upstream_prefix: None,
2214 schema_mismatch: None,
2215 }]);
2216 let engine = RoutingEngine::new(ProxyRoutingConfig::default(), report);
2217 server.set_routing_engine(Arc::new(engine));
2218
2219 let req = JsonRpcRequest {
2220 jsonrpc: JSONRPC_VERSION.to_string(),
2221 id: RequestId::Number(1),
2222 method: "tools/call".to_string(),
2223 params: Some(serde_json::json!({
2224 "name": "get_issues",
2225 "arguments": {}
2226 })),
2227 };
2228 let resp = server.handle_request(req).await;
2229 assert!(resp.error.is_none());
2231 let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
2232 assert!(result.is_error.is_none());
2233 }
2234
2235 #[tokio::test]
2236 async fn test_telemetry_buffer_receives_event_per_call() {
2237 let mut server = McpServer::new();
2238 server.add_provider(Arc::new(TestProvider));
2239
2240 let report = match_report_with(vec![ToolMatch {
2241 tool_name: "get_issues".to_string(),
2242 local_present: true,
2243 remote_present: false,
2244 schema_compatible: None,
2245 upstream_prefix: None,
2246 schema_mismatch: None,
2247 }]);
2248 let engine = RoutingEngine::new(ProxyRoutingConfig::default(), report);
2249 server.set_routing_engine(Arc::new(engine));
2250
2251 let buffer = TelemetryBuffer::new(16);
2252 server.set_telemetry(buffer.clone());
2253
2254 let req = JsonRpcRequest {
2255 jsonrpc: JSONRPC_VERSION.to_string(),
2256 id: RequestId::Number(1),
2257 method: "tools/call".to_string(),
2258 params: Some(serde_json::json!({
2259 "name": "get_issues",
2260 "arguments": {}
2261 })),
2262 };
2263 let _resp = server.handle_request(req).await;
2264
2265 let events = buffer.drain(100).await;
2266 assert_eq!(events.len(), 1);
2267 assert_eq!(events[0].tool, "get_issues");
2268 assert_eq!(events[0].routing_decision, "local_only");
2269 assert_eq!(events[0].status, TelemetryStatus::Success);
2270 }
2271
2272 #[tokio::test]
2273 async fn test_telemetry_event_captures_reason_detail_for_override_rule() {
2274 let mut server = McpServer::new();
2275 server.add_provider(Arc::new(TestProvider));
2276
2277 let report = match_report_with(vec![ToolMatch {
2278 tool_name: "get_issues".to_string(),
2279 local_present: true,
2280 remote_present: true,
2281 schema_compatible: Some(true),
2282 upstream_prefix: Some("cloud".to_string()),
2283 schema_mismatch: None,
2284 }]);
2285 let config = ProxyRoutingConfig {
2287 strategy: RoutingStrategy::Remote,
2288 fallback_on_error: true,
2289 tool_overrides: vec![devboy_core::config::ProxyToolRule {
2290 pattern: "get_*".to_string(),
2291 strategy: RoutingStrategy::Local,
2292 }],
2293 };
2294 let engine = RoutingEngine::new(config, report);
2295 server.set_routing_engine(Arc::new(engine));
2296
2297 let buffer = TelemetryBuffer::new(16);
2298 server.set_telemetry(buffer.clone());
2299
2300 let req = JsonRpcRequest {
2301 jsonrpc: JSONRPC_VERSION.to_string(),
2302 id: RequestId::Number(1),
2303 method: "tools/call".to_string(),
2304 params: Some(serde_json::json!({
2305 "name": "get_issues",
2306 "arguments": {}
2307 })),
2308 };
2309 let _resp = server.handle_request(req).await;
2310
2311 let events = buffer.drain(100).await;
2312 assert_eq!(events.len(), 1);
2313 assert_eq!(events[0].routing_decision, "override_rule");
2314 assert_eq!(events[0].routing_detail.as_deref(), Some("get_*"));
2315 assert!(events[0].upstream.is_none());
2316 }
2317
2318 #[tokio::test]
2319 async fn test_no_routing_engine_keeps_legacy_behaviour() {
2320 let mut server = McpServer::new();
2323 server.add_provider(Arc::new(TestProvider));
2324 let buffer = TelemetryBuffer::new(16);
2325 server.set_telemetry(buffer.clone());
2326
2327 let req = JsonRpcRequest {
2328 jsonrpc: JSONRPC_VERSION.to_string(),
2329 id: RequestId::Number(1),
2330 method: "tools/call".to_string(),
2331 params: Some(serde_json::json!({
2332 "name": "get_issues",
2333 "arguments": {}
2334 })),
2335 };
2336 let resp = server.handle_request(req).await;
2337 assert!(resp.error.is_none());
2338 let events = buffer.drain(100).await;
2339 assert_eq!(events.len(), 1);
2340 assert_eq!(events[0].routing_decision, "legacy_local");
2341 }
2342
2343 #[allow(dead_code)]
2345 fn _unused_cfg_helper_tooldef() -> ToolDefinition {
2346 ToolDefinition {
2347 name: "x".into(),
2348 description: "x".into(),
2349 input_schema: serde_json::json!({}),
2350 category: None,
2351 }
2352 }
2353
2354 #[test]
2355 fn test_messenger_providers_are_scoped_to_active_context() {
2356 let mut server = McpServer::new();
2357 server.ensure_context("slack-context");
2358 server.ensure_context("plain-context");
2359 server.add_messenger_provider_to_context("slack-context", Arc::new(TestMessengerProvider));
2360
2361 server.set_active_context("plain-context").unwrap();
2362 let plain_result: ToolsListResult = serde_json::from_value(
2363 server
2364 .handle_tools_list(RequestId::Number(1))
2365 .result
2366 .unwrap(),
2367 )
2368 .unwrap();
2369 assert!(
2370 !plain_result
2371 .tools
2372 .iter()
2373 .any(|tool| tool.name == "get_messenger_chats")
2374 );
2375
2376 server.set_active_context("slack-context").unwrap();
2377 let slack_result: ToolsListResult = serde_json::from_value(
2378 server
2379 .handle_tools_list(RequestId::Number(2))
2380 .result
2381 .unwrap(),
2382 )
2383 .unwrap();
2384 assert!(
2385 slack_result
2386 .tools
2387 .iter()
2388 .any(|tool| tool.name == "get_messenger_chats")
2389 );
2390 }
2391
2392 #[test]
2393 fn test_add_messenger_provider_creates_context_for_activation() {
2394 let mut server = McpServer::new();
2395 server.add_messenger_provider_to_context("messenger-only", Arc::new(TestMessengerProvider));
2396
2397 assert!(
2398 server
2399 .context_names()
2400 .contains(&"messenger-only".to_string())
2401 );
2402 assert!(server.set_active_context("messenger-only").is_ok());
2403 }
2404
2405 #[tokio::test]
2406 async fn test_deferred_init_resolves_proxy_on_tools_list() {
2407 let mut server = McpServer::new();
2408 server.initialized = true;
2409
2410 let (tx, rx) = oneshot::channel();
2412 server.set_deferred_init(rx);
2413
2414 tokio::spawn(async move {
2416 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2418 let proxy_manager = ProxyManager::new();
2419 let _ = tx.send(DeferredInit {
2420 proxy_manager,
2421 builtin_tools_config: None,
2422 routing_engine: None,
2423 });
2424 });
2425
2426 let resp = server
2428 .handle_request(JsonRpcRequest {
2429 jsonrpc: JSONRPC_VERSION.to_string(),
2430 id: RequestId::Number(1),
2431 method: "tools/list".to_string(),
2432 params: None,
2433 })
2434 .await;
2435
2436 assert!(resp.result.is_some());
2437 assert!(server.deferred_init.is_none());
2439 }
2440
2441 #[tokio::test]
2442 async fn test_deferred_init_applies_builtin_tools_config() {
2443 let mut server = McpServer::new();
2444 server.initialized = true;
2445 server.add_provider(Arc::new(TestProvider));
2446
2447 let (tx, rx) = oneshot::channel();
2448 server.set_deferred_init(rx);
2449
2450 let _ = tx.send(DeferredInit {
2452 proxy_manager: ProxyManager::new(),
2453 builtin_tools_config: Some(BuiltinToolsConfig {
2454 disabled: vec!["get_issues".to_string()],
2455 enabled: vec![],
2456 }),
2457 routing_engine: None,
2458 });
2459
2460 let resp = server
2461 .handle_request(JsonRpcRequest {
2462 jsonrpc: JSONRPC_VERSION.to_string(),
2463 id: RequestId::Number(1),
2464 method: "tools/list".to_string(),
2465 params: None,
2466 })
2467 .await;
2468
2469 let result: ToolsListResult = serde_json::from_value(resp.result.unwrap()).unwrap();
2470 assert!(!result.tools.iter().any(|t| t.name == "get_issues"));
2472 assert!(result.tools.iter().any(|t| t.name == "get_issue"));
2474 }
2475
2476 #[test]
2477 fn test_enable_layered_pipeline_sets_field() {
2478 use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
2479
2480 let mut server = McpServer::new();
2481 assert!(server.layered_pipeline.is_none());
2482 server.enable_layered_pipeline(SessionPipeline::new(AdaptiveConfig::default()));
2483 assert!(server.layered_pipeline.is_some());
2484 server.on_compaction_boundary();
2487 }
2488
2489 #[tokio::test]
2490 async fn test_compaction_notification_advances_partition() {
2491 use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
2492
2493 let mut server = McpServer::new();
2494 server.enable_layered_pipeline(SessionPipeline::new(AdaptiveConfig::default()));
2495 server.handle_notification("notifications/devboy/compact");
2497 server.handle_notification("notifications/totally/unrelated");
2499 }
2500
2501 #[tokio::test]
2502 async fn test_compact_pipeline_cache_internal_tool() {
2503 use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
2504
2505 let mut server = McpServer::new();
2506 server.enable_layered_pipeline(SessionPipeline::new(AdaptiveConfig::default()));
2507
2508 let req = JsonRpcRequest {
2509 jsonrpc: JSONRPC_VERSION.to_string(),
2510 id: RequestId::Number(1),
2511 method: "tools/call".to_string(),
2512 params: Some(serde_json::json!({
2513 "name": "compact_pipeline_cache",
2514 "arguments": {}
2515 })),
2516 };
2517 let resp = server.handle_request(req).await;
2518 assert!(resp.error.is_none());
2519 let result: ToolCallResult = serde_json::from_value(resp.result.unwrap()).unwrap();
2520 assert_eq!(result.is_error, None);
2521 }
2522
2523 #[tokio::test]
2524 async fn test_e2e_read_edit_read_busts_cache_via_server() {
2525 use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
2536
2537 let pipeline = SessionPipeline::new(AdaptiveConfig::default());
2538 let body = "x".repeat(600);
2539
2540 let read_params = crate::protocol::ToolCallParams {
2541 name: "Read".to_string(),
2542 arguments: Some(serde_json::json!({"file_path": "/tmp/e2e.rs"})),
2543 };
2544
2545 let r1 = pipeline.process("req_1", &read_params, ToolCallResult::text(body.clone()), 0);
2547 let crate::protocol::ToolResultContent::Text { text: t1 } = &r1.content[0];
2548 assert_eq!(t1, &body);
2549
2550 let edit_params = crate::protocol::ToolCallParams {
2553 name: "Edit".to_string(),
2554 arguments: Some(serde_json::json!({"file_path": "/tmp/e2e.rs"})),
2555 };
2556 if crate::layered::is_mutating_tool(&edit_params.name)
2557 && let Some(p) = crate::layered::extract_file_path(edit_params.arguments.as_ref())
2558 {
2559 pipeline.invalidate_file(&p);
2560 }
2561
2562 let r3 = pipeline.process(
2564 "req_3",
2565 &read_params,
2566 ToolCallResult::text(body.clone()),
2567 10,
2568 );
2569 let crate::protocol::ToolResultContent::Text { text: t3 } = &r3.content[0];
2570 assert_eq!(
2571 t3, &body,
2572 "Edit must bust the dedup cache so subsequent Read is fresh"
2573 );
2574 }
2575
2576 #[tokio::test]
2577 async fn test_speculate_after_runs_when_enrichment_enabled() {
2578 use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
2583 use std::sync::Arc;
2584
2585 struct StubDispatcher;
2586 #[async_trait::async_trait]
2587 impl crate::speculation::PrefetchDispatcher for StubDispatcher {
2588 async fn dispatch(
2589 &self,
2590 _tool_name: &str,
2591 _args: serde_json::Value,
2592 ) -> Result<String, crate::speculation::PrefetchError> {
2593 Ok("prefetched body".to_string())
2594 }
2595 }
2596
2597 let mut cfg = AdaptiveConfig {
2598 tools: devboy_format_pipeline::tool_defaults::default_tool_value_models(),
2599 ..AdaptiveConfig::default()
2600 };
2601 cfg.enrichment.enabled = true;
2602 cfg.enrichment.prefetch_timeout_ms = 200;
2603 cfg.enrichment.prefetch_budget_tokens = 4_000;
2604
2605 let pipeline = SessionPipeline::new(cfg)
2606 .with_speculation(Arc::new(StubDispatcher))
2607 .await;
2608 let mut server = McpServer::new();
2609 server.enable_layered_pipeline(pipeline);
2610
2611 let _ = server
2619 .handle_request(JsonRpcRequest {
2620 jsonrpc: JSONRPC_VERSION.to_string(),
2621 id: RequestId::Number(1),
2622 method: "tools/call".to_string(),
2623 params: Some(serde_json::json!({
2624 "name": "Glob",
2625 "arguments": {"pattern": "src/**/*.rs"}
2626 })),
2627 })
2628 .await;
2629
2630 let snap = server
2637 .layered_pipeline
2638 .as_ref()
2639 .unwrap()
2640 .enrichment_snapshot();
2641 assert!(snap.total_prefetches < 100, "sanity bound");
2645 }
2646
2647 #[tokio::test]
2648 async fn test_fail_fast_short_circuits_dispatch() {
2649 use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
2653
2654 let mut cfg = AdaptiveConfig {
2655 tools: devboy_format_pipeline::tool_defaults::default_tool_value_models(),
2656 ..AdaptiveConfig::default()
2657 };
2658 cfg.enrichment.enabled = false;
2659
2660 let pipeline = SessionPipeline::new(cfg);
2661 let empty_params = crate::protocol::ToolCallParams {
2664 name: "ToolSearch".to_string(),
2665 arguments: None,
2666 };
2667 for i in 0..2 {
2668 pipeline.process(
2669 &format!("rid_{i}"),
2670 &empty_params,
2671 ToolCallResult::text(String::new()),
2672 i,
2673 );
2674 }
2675 assert!(
2676 pipeline.should_skip("ToolSearch"),
2677 "circuit must be armed after 2 empty responses"
2678 );
2679
2680 let pre_count = pipeline
2681 .enrichment_snapshot()
2682 .inference_calls_saved_fail_fast;
2683
2684 let mut server = McpServer::new();
2685 server.enable_layered_pipeline(pipeline);
2686
2687 let resp = server
2690 .handle_request(JsonRpcRequest {
2691 jsonrpc: JSONRPC_VERSION.to_string(),
2692 id: RequestId::Number(99),
2693 method: "tools/call".to_string(),
2694 params: Some(serde_json::json!({
2695 "name": "ToolSearch",
2696 "arguments": {"query": "anything"}
2697 })),
2698 })
2699 .await;
2700 assert!(resp.error.is_none(), "fail-fast must succeed, not error");
2701 let result = resp.result.expect("must carry a result");
2702 let body = result["content"][0]["text"].as_str().expect("text content");
2703 assert!(
2704 body.contains("fail-fast"),
2705 "expected fail-fast hint, got: {body}"
2706 );
2707 let post_count = server
2709 .layered_pipeline
2710 .as_ref()
2711 .unwrap()
2712 .enrichment_snapshot()
2713 .inference_calls_saved_fail_fast;
2714 assert_eq!(
2715 post_count,
2716 pre_count + 1,
2717 "fail-fast must record the saved call"
2718 );
2719 }
2720
2721 #[tokio::test]
2722 async fn test_layered_pipeline_dedups_repeated_internal_tool_response() {
2723 use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
2724
2725 let mut server = McpServer::new();
2726 server.contexts.insert("workspace".to_string(), vec![]);
2727 server.set_active_context("workspace").unwrap();
2728 server.enable_layered_pipeline(SessionPipeline::new(AdaptiveConfig::default()));
2729
2730 let make_req = |id: i64| JsonRpcRequest {
2735 jsonrpc: JSONRPC_VERSION.to_string(),
2736 id: RequestId::Number(id),
2737 method: "tools/call".to_string(),
2738 params: Some(serde_json::json!({
2739 "name": "get_current_context",
2740 "arguments": {}
2741 })),
2742 };
2743
2744 let r1 = server.handle_request(make_req(1)).await;
2745 let r2 = server.handle_request(make_req(2)).await;
2746 assert!(r1.error.is_none());
2747 assert!(r2.error.is_none());
2748 assert_eq!(r1.result, r2.result);
2751 }
2752}