Skip to main content

devboy_mcp/
prefetch_adapter.rs

1//! Paper 3 — production [`PrefetchDispatcher`] adapter.
2//!
3//! Bridges [`SpeculationEngine`] to [`McpServer::execute_for_prefetch`]
4//! so the speculation path runs against the same routing /
5//! transparent-proxy / fallback machinery as the main flow. The
6//! adapter holds an `Arc<McpServer>` (server is consumed once at
7//! startup), and forwards every dispatch through the server's
8//! routing engine.
9//!
10//! Failure modes are converted to [`PrefetchError`] variants so the
11//! engine can count them as wasted prefetches without surfacing
12//! them to the LLM stream.
13//!
14//! Wiring contract:
15//!
16//! 1. Build `Arc<McpServer>` once at startup.
17//! 2. Construct `McpPrefetchDispatcher::new(Arc::clone(&server))`.
18//! 3. `session_pipeline.with_speculation(Arc::new(dispatcher)).await`.
19//! 4. From now on, `SessionPipeline::speculate_after` will dispatch
20//!    real `tools/call` requests through the server, results land
21//!    in the dedup cache, and the LLM's organic call collapses to L0.
22//!
23//! See `paper-3-tool-aware-enrichment.md` §Race-strategy for the
24//! end-to-end timing of how prefetch results meet the LLM's main
25//! response.
26//!
27//! [`SpeculationEngine`]: crate::speculation::SpeculationEngine
28//! [`PrefetchDispatcher`]: crate::speculation::PrefetchDispatcher
29//! [`McpServer`]: crate::server::McpServer
30//! [`McpServer::execute_for_prefetch`]: crate::server::McpServer::execute_for_prefetch
31//! [`PrefetchError`]: crate::speculation::PrefetchError
32
33use std::sync::Arc;
34
35use async_trait::async_trait;
36use serde_json::Value;
37
38use crate::protocol::ToolResultContent;
39use crate::server::McpServer;
40use crate::speculation::{PrefetchDispatcher, PrefetchError};
41
42/// Production [`PrefetchDispatcher`] backed by an `Arc<McpServer>`.
43pub struct McpPrefetchDispatcher {
44    server: Arc<McpServer>,
45}
46
47impl McpPrefetchDispatcher {
48    pub fn new(server: Arc<McpServer>) -> Self {
49        Self { server }
50    }
51}
52
53#[async_trait]
54impl PrefetchDispatcher for McpPrefetchDispatcher {
55    async fn dispatch(&self, tool_name: &str, args: Value) -> Result<String, PrefetchError> {
56        let result = self
57            .server
58            .execute_for_prefetch(tool_name, Some(args))
59            .await;
60
61        if result.is_error == Some(true) {
62            let detail = result
63                .content
64                .iter()
65                .map(|c| match c {
66                    ToolResultContent::Text { text } => text.clone(),
67                })
68                .next()
69                .unwrap_or_default();
70            return Err(PrefetchError::Rejected(detail));
71        }
72
73        // Concatenate all text chunks (provider tools often return
74        // multiple Text blocks). Empty bodies are returned as
75        // empty strings so the dedup cache sees a deterministic
76        // value — fail-fast circuit will pick them up if needed.
77        let body: String = result
78            .content
79            .into_iter()
80            .map(|c| match c {
81                ToolResultContent::Text { text } => text,
82            })
83            .collect::<Vec<_>>()
84            .join("\n");
85        Ok(body)
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use crate::layered::SessionPipeline;
93    use crate::protocol::ToolCallResult;
94    use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
95    use serde_json::json;
96    use std::sync::Arc;
97
98    /// End-to-end smoke: SpeculationEngine + McpPrefetchDispatcher
99    /// + a stub server that returns a canned body for `Read`.
100    /// Validates that the dispatcher honours the routing path and
101    /// surfaces results back as Ok strings.
102    #[tokio::test]
103    async fn dispatcher_routes_through_server_and_returns_body() {
104        let server = Arc::new(McpServer::new());
105
106        // The bare McpServer has no providers; `execute_for_prefetch`
107        // will fall through to the legacy_dispatch path which returns
108        // an error for unknown tools — perfect for asserting the
109        // failure-to-Rejected mapping below.
110        let dispatcher = McpPrefetchDispatcher::new(Arc::clone(&server));
111
112        // Unknown tool → server returns error → dispatcher converts
113        // to PrefetchError::Rejected.
114        let err = dispatcher
115            .dispatch("Read", json!({"file_path": "/tmp/x"}))
116            .await;
117        assert!(matches!(err, Err(PrefetchError::Rejected(_))));
118    }
119
120    #[tokio::test]
121    async fn dispatcher_blocks_internal_context_tools() {
122        let server = Arc::new(McpServer::new());
123        let dispatcher = McpPrefetchDispatcher::new(Arc::clone(&server));
124        for tool in ["use_context", "list_contexts", "get_current_context"] {
125            let r = dispatcher.dispatch(tool, json!({})).await;
126            assert!(
127                matches!(&r, Err(PrefetchError::Rejected(msg)) if msg.contains("internal")),
128                "{tool} must be rejected as internal, got: {r:?}"
129            );
130        }
131    }
132
133    #[tokio::test]
134    async fn session_pipeline_can_attach_real_dispatcher() {
135        // Smoke: the dispatcher type satisfies the trait bound on
136        // SessionPipeline::with_speculation. We don't care about the
137        // dispatch outcome — the unknown tool → error path is
138        // already covered.
139        let server = Arc::new(McpServer::new());
140        let dispatcher: Arc<dyn PrefetchDispatcher> = Arc::new(McpPrefetchDispatcher::new(server));
141        let cfg = AdaptiveConfig::default();
142        let _pipeline = SessionPipeline::new(cfg).with_speculation(dispatcher).await;
143    }
144
145    /// Sanity: ToolCallResult round-trips through the dispatcher
146    /// without losing the body. Requires a fake provider impl —
147    /// keeping it terse so we don't recreate a test server fixture.
148    #[test]
149    fn tool_call_result_text_fields_concatenate_with_newline() {
150        let r = ToolCallResult {
151            content: vec![
152                ToolResultContent::Text {
153                    text: "first".into(),
154                },
155                ToolResultContent::Text {
156                    text: "second".into(),
157                },
158            ],
159            is_error: Some(false),
160        };
161        let body: String = r
162            .content
163            .into_iter()
164            .map(|c| match c {
165                ToolResultContent::Text { text } => text,
166            })
167            .collect::<Vec<_>>()
168            .join("\n");
169        assert_eq!(body, "first\nsecond");
170    }
171}