mcpr_core/proxy/pipeline/middlewares/
schema_ingest.rs1use async_trait::async_trait;
10use serde_json::Value;
11
12use crate::event::{ProxyEvent, SchemaVersionCreatedEvent};
13use crate::protocol::mcp::{MessageKind, ServerKind};
14use crate::protocol::schema as proto_schema;
15use crate::proxy::pipeline::middleware::ResponseMiddleware;
16use crate::proxy::pipeline::values::{Context, Response};
17
18pub struct SchemaIngestMiddleware;
19
20#[async_trait]
21impl ResponseMiddleware for SchemaIngestMiddleware {
22 fn name(&self) -> &'static str {
23 "schema_ingest"
24 }
25
26 async fn on_response(&self, resp: Response, cx: &mut Context) -> Response {
27 let message = match &resp {
28 Response::McpBuffered { message, .. } => message,
29 _ => return resp,
30 };
31 if !matches!(message.kind, MessageKind::Server(ServerKind::Result)) {
32 return resp;
33 }
34 let Some(method) = cx.working.request_method.as_ref() else {
35 return resp;
36 };
37 if !proto_schema::is_schema_method(method) {
38 return resp;
39 }
40 let Some(method_str) = method.as_str() else {
41 return resp;
42 };
43 let Some(result_val) = message.envelope.result_as::<Value>() else {
44 return resp;
45 };
46 let response_val = serde_json::json!({
53 "jsonrpc": "2.0",
54 "result": result_val,
55 });
56 let req_val = serde_json::json!({
57 "jsonrpc": "2.0",
58 "method": method_str,
59 });
60
61 let state = cx.intake.proxy.clone();
62 let bus = state.event_bus.clone();
63 let upstream_id = state.name.clone();
64 let upstream_url = state.mcp_upstream.clone();
65 let method_owned = method_str.to_string();
66
67 state
68 .schema_manager
69 .spawn_ingest(method_owned, req_val, response_val, move |version| {
70 bus.emit(ProxyEvent::SchemaVersionCreated(
71 SchemaVersionCreatedEvent {
72 ts: chrono::Utc::now().timestamp_millis(),
73 upstream_id,
74 upstream_url,
75 method: version.method.clone(),
76 version: version.version,
77 version_id: version.id.to_string(),
78 content_hash: version.content_hash.clone(),
79 payload: (*version.payload).clone(),
80 },
81 ));
82 });
83 resp
88 }
89}
90
91#[cfg(test)]
92#[allow(non_snake_case)]
93mod tests {
94 use super::*;
95
96 use axum::http::StatusCode;
97
98 use crate::protocol::mcp::{ClientMethod, ToolsMethod};
99 use crate::proxy::pipeline::middlewares::test_support::{
100 mcp_buffered_response, set_request_method, test_context, test_proxy_with_sink,
101 };
102
103 #[tokio::test]
104 async fn on_response__non_buffered_passthrough() {
105 let (proxy, sink, handle) = test_proxy_with_sink();
106 let mut cx = test_context(proxy.clone());
107 set_request_method(&mut cx, ClientMethod::Tools(ToolsMethod::List));
108 let resp = Response::Upstream502 {
109 reason: "boom".into(),
110 };
111
112 let resp = SchemaIngestMiddleware.on_response(resp, &mut cx).await;
113 assert!(matches!(resp, Response::Upstream502 { .. }));
114 proxy.schema_manager.wait_idle().await;
115 handle.shutdown().await;
116 assert!(sink.snapshot().is_empty());
117 }
118
119 #[tokio::test]
120 async fn on_response__no_request_method_passthrough() {
121 let (proxy, sink, handle) = test_proxy_with_sink();
122 let mut cx = test_context(proxy.clone());
123 let resp = mcp_buffered_response(
124 r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}"#,
125 StatusCode::OK,
126 );
127
128 SchemaIngestMiddleware.on_response(resp, &mut cx).await;
129 proxy.schema_manager.wait_idle().await;
130 handle.shutdown().await;
131 assert!(sink.snapshot().is_empty());
132 }
133
134 #[tokio::test]
135 async fn on_response__non_schema_method_passthrough() {
136 let (proxy, sink, handle) = test_proxy_with_sink();
137 let mut cx = test_context(proxy.clone());
138 set_request_method(&mut cx, ClientMethod::Tools(ToolsMethod::Call));
139 let resp = mcp_buffered_response(
140 r#"{"jsonrpc":"2.0","id":1,"result":{"content":[]}}"#,
141 StatusCode::OK,
142 );
143
144 SchemaIngestMiddleware.on_response(resp, &mut cx).await;
145 proxy.schema_manager.wait_idle().await;
146 handle.shutdown().await;
147 assert!(sink.snapshot().is_empty());
148 }
149
150 #[tokio::test]
151 async fn on_response__server_notification_passthrough() {
152 let (proxy, sink, handle) = test_proxy_with_sink();
153 let mut cx = test_context(proxy.clone());
154 set_request_method(&mut cx, ClientMethod::Tools(ToolsMethod::List));
155 let resp = mcp_buffered_response(
156 r#"{"jsonrpc":"2.0","method":"notifications/tools/list_changed"}"#,
157 StatusCode::OK,
158 );
159
160 SchemaIngestMiddleware.on_response(resp, &mut cx).await;
161 proxy.schema_manager.wait_idle().await;
162 handle.shutdown().await;
163 assert!(sink.snapshot().is_empty());
164 }
165
166 #[tokio::test]
167 async fn on_response__tools_list_spawns_and_emits() {
168 let (proxy, sink, handle) = test_proxy_with_sink();
169 let mut cx = test_context(proxy.clone());
170 set_request_method(&mut cx, ClientMethod::Tools(ToolsMethod::List));
171 let resp = mcp_buffered_response(
172 r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[{"name":"one"}]}}"#,
173 StatusCode::OK,
174 );
175
176 SchemaIngestMiddleware.on_response(resp, &mut cx).await;
177 proxy.schema_manager.wait_idle().await;
178 handle.shutdown().await;
179
180 let events = sink.snapshot();
181 let got_schema = events
182 .iter()
183 .any(|e| matches!(e, ProxyEvent::SchemaVersionCreated(v) if v.method == "tools/list"));
184 assert!(got_schema, "expected SchemaVersionCreated for tools/list");
185 }
186}