Skip to main content

mcpr_core/proxy/pipeline/middlewares/
schema_ingest.rs

1//! Response-side middleware: spawn fire-and-forget schema ingest on
2//! list-method result responses.
3//!
4//! Reads the originating method from `cx.working.request_method`
5//! (stashed by `SessionTouchMiddleware`), reconstructs the request
6//! envelope the ingest task needs, and hands ownership to the spawned
7//! task. The hot path never waits for merge / hash / store.
8
9use async_trait::async_trait;
10use serde_json::Value;
11
12use crate::event::{ProxyEvent, SchemaVersionCreatedEvent};
13use crate::protocol::mcp::{MessageKind, ServerKind};
14use crate::protocol::schema as proto_schema;
15use crate::proxy::pipeline::middleware::ResponseMiddleware;
16use crate::proxy::pipeline::values::{Context, Response};
17
18pub struct SchemaIngestMiddleware;
19
20#[async_trait]
21impl ResponseMiddleware for SchemaIngestMiddleware {
22    fn name(&self) -> &'static str {
23        "schema_ingest"
24    }
25
26    async fn on_response(&self, resp: Response, cx: &mut Context) -> Response {
27        let message = match &resp {
28            Response::McpBuffered { message, .. } => message,
29            _ => return resp,
30        };
31        if !matches!(message.kind, MessageKind::Server(ServerKind::Result)) {
32            return resp;
33        }
34        let Some(method) = cx.working.request_method.as_ref() else {
35            return resp;
36        };
37        if !proto_schema::is_schema_method(method) {
38            return resp;
39        }
40        let Some(method_str) = method.as_str() else {
41            return resp;
42        };
43        let Some(result_val) = message.envelope.result_as::<Value>() else {
44            return resp;
45        };
46        // `SchemaManager::ingest` reads `response_body["result"]`, so
47        // pass the full JSON-RPC shape. The request envelope is best-
48        // effort — we don't have the original request params on the
49        // response side yet, so the `params` field is `null`. That
50        // matches today's behavior when the incoming request omits
51        // `params` (e.g. vanilla `tools/list`).
52        let response_val = serde_json::json!({
53            "jsonrpc": "2.0",
54            "result": result_val,
55        });
56        let req_val = serde_json::json!({
57            "jsonrpc": "2.0",
58            "method": method_str,
59        });
60
61        let state = cx.intake.proxy.clone();
62        let bus = state.event_bus.clone();
63        let upstream_id = state.name.clone();
64        let upstream_url = state.mcp_upstream.clone();
65        let method_owned = method_str.to_string();
66
67        state
68            .schema_manager
69            .spawn_ingest(method_owned, req_val, response_val, move |version| {
70                bus.emit(ProxyEvent::SchemaVersionCreated(
71                    SchemaVersionCreatedEvent {
72                        ts: chrono::Utc::now().timestamp_millis(),
73                        upstream_id,
74                        upstream_url,
75                        method: version.method.clone(),
76                        version: version.version,
77                        version_id: version.id.to_string(),
78                        content_hash: version.content_hash.clone(),
79                        payload: (*version.payload).clone(),
80                    },
81                ));
82            });
83        // Note: the driver's middleware wrap times the whole call,
84        // including spawn-prep. The background ingest task runs off the
85        // hot path and is deliberately excluded from this figure.
86
87        resp
88    }
89}
90
91#[cfg(test)]
92#[allow(non_snake_case)]
93mod tests {
94    use super::*;
95
96    use axum::http::StatusCode;
97
98    use crate::protocol::mcp::{ClientMethod, ToolsMethod};
99    use crate::proxy::pipeline::middlewares::test_support::{
100        mcp_buffered_response, set_request_method, test_context, test_proxy_with_sink,
101    };
102
103    #[tokio::test]
104    async fn on_response__non_buffered_passthrough() {
105        let (proxy, sink, handle) = test_proxy_with_sink();
106        let mut cx = test_context(proxy.clone());
107        set_request_method(&mut cx, ClientMethod::Tools(ToolsMethod::List));
108        let resp = Response::Upstream502 {
109            reason: "boom".into(),
110        };
111
112        let resp = SchemaIngestMiddleware.on_response(resp, &mut cx).await;
113        assert!(matches!(resp, Response::Upstream502 { .. }));
114        proxy.schema_manager.wait_idle().await;
115        handle.shutdown().await;
116        assert!(sink.snapshot().is_empty());
117    }
118
119    #[tokio::test]
120    async fn on_response__no_request_method_passthrough() {
121        let (proxy, sink, handle) = test_proxy_with_sink();
122        let mut cx = test_context(proxy.clone());
123        let resp = mcp_buffered_response(
124            r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}"#,
125            StatusCode::OK,
126        );
127
128        SchemaIngestMiddleware.on_response(resp, &mut cx).await;
129        proxy.schema_manager.wait_idle().await;
130        handle.shutdown().await;
131        assert!(sink.snapshot().is_empty());
132    }
133
134    #[tokio::test]
135    async fn on_response__non_schema_method_passthrough() {
136        let (proxy, sink, handle) = test_proxy_with_sink();
137        let mut cx = test_context(proxy.clone());
138        set_request_method(&mut cx, ClientMethod::Tools(ToolsMethod::Call));
139        let resp = mcp_buffered_response(
140            r#"{"jsonrpc":"2.0","id":1,"result":{"content":[]}}"#,
141            StatusCode::OK,
142        );
143
144        SchemaIngestMiddleware.on_response(resp, &mut cx).await;
145        proxy.schema_manager.wait_idle().await;
146        handle.shutdown().await;
147        assert!(sink.snapshot().is_empty());
148    }
149
150    #[tokio::test]
151    async fn on_response__server_notification_passthrough() {
152        let (proxy, sink, handle) = test_proxy_with_sink();
153        let mut cx = test_context(proxy.clone());
154        set_request_method(&mut cx, ClientMethod::Tools(ToolsMethod::List));
155        let resp = mcp_buffered_response(
156            r#"{"jsonrpc":"2.0","method":"notifications/tools/list_changed"}"#,
157            StatusCode::OK,
158        );
159
160        SchemaIngestMiddleware.on_response(resp, &mut cx).await;
161        proxy.schema_manager.wait_idle().await;
162        handle.shutdown().await;
163        assert!(sink.snapshot().is_empty());
164    }
165
166    #[tokio::test]
167    async fn on_response__tools_list_spawns_and_emits() {
168        let (proxy, sink, handle) = test_proxy_with_sink();
169        let mut cx = test_context(proxy.clone());
170        set_request_method(&mut cx, ClientMethod::Tools(ToolsMethod::List));
171        let resp = mcp_buffered_response(
172            r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[{"name":"one"}]}}"#,
173            StatusCode::OK,
174        );
175
176        SchemaIngestMiddleware.on_response(resp, &mut cx).await;
177        proxy.schema_manager.wait_idle().await;
178        handle.shutdown().await;
179
180        let events = sink.snapshot();
181        let got_schema = events
182            .iter()
183            .any(|e| matches!(e, ProxyEvent::SchemaVersionCreated(v) if v.method == "tools/list"));
184        assert!(got_schema, "expected SchemaVersionCreated for tools/list");
185    }
186}