Skip to main content

mcpr_core/protocol/schema_manager/
manager.rs

1//! `SchemaManager` — top-level per-upstream view of an MCP server's schema.
2//!
3//! Callers feed schema-method responses in via [`SchemaManager::ingest`].
4//! The manager handles pagination buffering, change detection (by content
5//! hash), and version assignment, persisting new versions to a
6//! [`SchemaStore`]. Query methods read back the latest merged payload
7//! without re-hitting the store per item.
8
9use std::sync::Arc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11
12use chrono::{DateTime, Utc};
13use dashmap::DashMap;
14use serde_json::Value;
15use tokio::sync::Notify;
16
17use super::store::SchemaStore;
18use super::version::{SchemaVersion, SchemaVersionId, hash_payload};
19use crate::protocol::schema::{PageStatus, canonical_hash_view, detect_page_status, merge_pages};
20
21/// Tracks in-flight `spawn_ingest` tasks so callers (shutdown handlers,
22/// tests) can wait until the async ingest queue has drained.
23#[derive(Default)]
24struct PendingTracker {
25    count: AtomicUsize,
26    notify: Notify,
27}
28
29impl PendingTracker {
30    fn begin(&self) {
31        self.count.fetch_add(1, Ordering::SeqCst);
32    }
33    fn end(&self) {
34        if self.count.fetch_sub(1, Ordering::SeqCst) == 1 {
35            self.notify.notify_waiters();
36        }
37    }
38    async fn wait_idle(&self) {
39        while self.count.load(Ordering::SeqCst) > 0 {
40            let notified = self.notify.notified();
41            if self.count.load(Ordering::SeqCst) == 0 {
42                return;
43            }
44            notified.await;
45        }
46    }
47}
48
49/// Per-method runtime state held in memory. Separate from the
50/// `SchemaStore` because these fields serve the hot path — change
51/// detection, pagination buffering, stale flag — and would be expensive
52/// to read-through on every ingest.
53#[derive(Default)]
54struct MethodState {
55    page_buffer: Vec<Value>,
56    current_hash: Option<String>,
57    next_version_number: u32,
58    stale: bool,
59    stale_since: Option<DateTime<Utc>>,
60}
61
62/// Top-level handle for one upstream MCP server's schema view.
63///
64/// Generic over the store backend; downstream typically uses
65/// `SchemaManager<MemorySchemaStore>` for the OSS proxy and swaps in a
66/// database-backed store for cloud deployments.
67pub struct SchemaManager<S: SchemaStore> {
68    upstream_id: String,
69    store: S,
70    state: Arc<DashMap<String, MethodState>>,
71    pending: Arc<PendingTracker>,
72}
73
74impl<S: SchemaStore> SchemaManager<S> {
75    pub fn new(upstream_id: impl Into<String>, store: S) -> Self {
76        Self {
77            upstream_id: upstream_id.into(),
78            store,
79            state: Arc::new(DashMap::new()),
80            pending: Arc::new(PendingTracker::default()),
81        }
82    }
83
84    /// Wait until every task spawned via [`spawn_ingest`] has finished.
85    ///
86    /// Used by shutdown/test code so the bus sees every
87    /// `SchemaVersionCreated` event before it drains.
88    pub async fn wait_idle(&self) {
89        self.pending.wait_idle().await;
90    }
91
92    pub fn upstream_id(&self) -> &str {
93        &self.upstream_id
94    }
95
96    /// Seed the in-memory state for `method` from the store.
97    ///
98    /// Callers normally don't need to invoke this directly — `ingest`
99    /// lazy-warms on the first call for a method. Exposed for explicit
100    /// startup warm-up when desired.
101    pub async fn warm(&self, method: &str) {
102        let latest = self
103            .store
104            .latest_version_for_method(&self.upstream_id, method)
105            .await;
106        if let Some(latest) = latest {
107            let mut entry = self.state.entry(method.to_string()).or_default();
108            if entry.current_hash.is_none() {
109                entry.current_hash = Some(latest.content_hash.clone());
110                entry.next_version_number = latest.version + 1;
111            }
112        }
113    }
114
115    /// Bootstrap in-memory state from a pre-existing `SchemaVersion`
116    /// (typically loaded from an external persistent store at startup).
117    ///
118    /// Seeds `current_hash` + `next_version_number` so subsequent
119    /// `ingest` calls with matching content return `None` (no phantom
120    /// new version) and non-matching content increments from
121    /// `version.version + 1`. Also writes the version into the
122    /// manager's in-process store so `latest` / `list_tools` /
123    /// `get_tool` / etc. see it without needing the first live request.
124    ///
125    /// Idempotent per method: if `current_hash` is already set (either
126    /// from a prior preload or a completed ingest), this is a no-op.
127    pub async fn preload(&self, version: SchemaVersion) {
128        {
129            let mut entry = self.state.entry(version.method.clone()).or_default();
130            if entry.current_hash.is_some() {
131                return;
132            }
133            entry.current_hash = Some(version.content_hash.clone());
134            entry.next_version_number = version.version.saturating_add(1);
135        }
136        self.store.put_version(version).await;
137    }
138
139    /// Feed a schema-method response through the manager.
140    ///
141    /// Returns `Some(version)` when a new `SchemaVersion` was created
142    /// (pagination complete AND content differs from the current
143    /// version). Returns `None` when:
144    ///
145    /// - The response is not a complete page (still buffering).
146    /// - The content hash matches the current version.
147    /// - The response has no `result` field.
148    pub async fn ingest(
149        &self,
150        method: &str,
151        request_body: &Value,
152        response_body: &Value,
153    ) -> Option<SchemaVersion> {
154        let result = response_body.get("result")?;
155        let status = detect_page_status(request_body, response_body);
156
157        let merged = {
158            let mut entry = self.state.entry(method.to_string()).or_default();
159            entry.page_buffer.push(result.clone());
160            match status {
161                PageStatus::Complete | PageStatus::LastPage => {
162                    let pages = std::mem::take(&mut entry.page_buffer);
163                    merge_pages(method, &pages)
164                        .unwrap_or_else(|| pages.into_iter().next().unwrap_or(Value::Null))
165                }
166                PageStatus::FirstPage | PageStatus::MiddlePage => return None,
167            }
168        };
169
170        let hash = match canonical_hash_view(method, &merged) {
171            Some(view) => hash_payload(&view),
172            None => hash_payload(&merged),
173        };
174
175        let needs_warm = self
176            .state
177            .get(method)
178            .map(|e| e.current_hash.is_none() && e.next_version_number == 0)
179            .unwrap_or(true);
180        if needs_warm {
181            self.warm(method).await;
182        }
183
184        let (same, version_number) = {
185            let mut entry = self.state.entry(method.to_string()).or_default();
186            if entry.current_hash.as_deref() == Some(hash.as_str()) {
187                (true, 0)
188            } else {
189                let num = entry.next_version_number.max(1);
190                entry.current_hash = Some(hash.clone());
191                entry.next_version_number = num.saturating_add(1);
192                entry.stale = false;
193                entry.stale_since = None;
194                (false, num)
195            }
196        };
197
198        if same {
199            return None;
200        }
201
202        let id = SchemaVersionId(hash.chars().take(16).collect());
203        let version = SchemaVersion {
204            id,
205            upstream_id: self.upstream_id.clone(),
206            method: method.to_string(),
207            version: version_number,
208            payload: Arc::new(merged),
209            content_hash: hash,
210            captured_at: Utc::now(),
211        };
212        Some(self.store.put_version(version).await)
213    }
214
215    /// Spawn an async ingest task so the caller's hot path does not
216    /// pay for merge/hash/store work.
217    ///
218    /// Returns immediately after spawning. Use [`wait_idle`] to block
219    /// until every spawned task (including this one) has completed.
220    ///
221    /// The caller provides a sink closure that receives the new
222    /// [`SchemaVersion`] when one is produced (for emitting events).
223    /// The closure runs on the spawned task, not on the caller.
224    pub fn spawn_ingest<F>(
225        self: &Arc<Self>,
226        method: String,
227        request_body: Value,
228        response_body: Value,
229        on_version: F,
230    ) where
231        F: FnOnce(&SchemaVersion) + Send + 'static,
232    {
233        self.pending.begin();
234        let manager = Arc::clone(self);
235        tokio::spawn(async move {
236            let result = manager.ingest(&method, &request_body, &response_body).await;
237            if let Some(version) = result.as_ref() {
238                on_version(version);
239            }
240            manager.pending.end();
241        });
242    }
243
244    /// Latest stored version for `method`, or `None` if nothing has
245    /// been ingested yet.
246    pub async fn latest(&self, method: &str) -> Option<SchemaVersion> {
247        self.store
248            .latest_version_for_method(&self.upstream_id, method)
249            .await
250    }
251
252    pub async fn list_tools(&self) -> Vec<Value> {
253        self.list_items("tools/list", "tools").await
254    }
255
256    pub async fn list_resources(&self) -> Vec<Value> {
257        self.list_items("resources/list", "resources").await
258    }
259
260    pub async fn list_resource_templates(&self) -> Vec<Value> {
261        self.list_items("resources/templates/list", "resourceTemplates")
262            .await
263    }
264
265    pub async fn list_prompts(&self) -> Vec<Value> {
266        self.list_items("prompts/list", "prompts").await
267    }
268
269    pub async fn get_tool(&self, name: &str) -> Option<Value> {
270        self.find_item_by_field("tools/list", "tools", "name", name)
271            .await
272    }
273
274    pub async fn get_resource(&self, uri: &str) -> Option<Value> {
275        self.find_item_by_field("resources/list", "resources", "uri", uri)
276            .await
277    }
278
279    pub async fn get_prompt(&self, name: &str) -> Option<Value> {
280        self.find_item_by_field("prompts/list", "prompts", "name", name)
281            .await
282    }
283
284    /// Mark the current version for `method` as stale. Idempotent.
285    ///
286    /// Sync on purpose — the stale flag is used by the hot request
287    /// path (observing `notifications/tools/list_changed`) where a
288    /// round-trip to async code would be overkill.
289    pub fn mark_stale(&self, method: &str) {
290        let mut entry = self.state.entry(method.to_string()).or_default();
291        if !entry.stale {
292            entry.stale = true;
293            entry.stale_since = Some(Utc::now());
294        }
295    }
296
297    pub fn is_stale(&self, method: &str) -> bool {
298        self.state.get(method).map(|e| e.stale).unwrap_or(false)
299    }
300
301    pub fn stale_since(&self, method: &str) -> Option<DateTime<Utc>> {
302        self.state.get(method).and_then(|e| e.stale_since)
303    }
304
305    // ── internals ──
306
307    async fn list_items(&self, method: &str, array_key: &str) -> Vec<Value> {
308        let Some(latest) = self.latest(method).await else {
309            return Vec::new();
310        };
311        latest
312            .payload
313            .get(array_key)
314            .and_then(|v| v.as_array())
315            .cloned()
316            .unwrap_or_default()
317    }
318
319    async fn find_item_by_field(
320        &self,
321        method: &str,
322        array_key: &str,
323        field: &str,
324        needle: &str,
325    ) -> Option<Value> {
326        let latest = self.latest(method).await?;
327        let arr = latest.payload.get(array_key).and_then(|v| v.as_array())?;
328        arr.iter()
329            .find(|item| item.get(field).and_then(|v| v.as_str()) == Some(needle))
330            .cloned()
331    }
332}
333
334#[cfg(test)]
335#[allow(non_snake_case)]
336mod tests {
337    use super::*;
338    use crate::protocol::schema_manager::store::MemorySchemaStore;
339    use serde_json::json;
340
341    fn manager() -> SchemaManager<MemorySchemaStore> {
342        SchemaManager::new("proxy-1", MemorySchemaStore::new())
343    }
344
345    fn tools_list_req(cursor: Option<&str>) -> Value {
346        match cursor {
347            Some(c) => {
348                json!({"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {"cursor": c}})
349            }
350            None => json!({"jsonrpc": "2.0", "id": 1, "method": "tools/list"}),
351        }
352    }
353
354    fn tools_list_resp(tools: Value, next_cursor: Option<&str>) -> Value {
355        let mut result = json!({"tools": tools});
356        if let Some(c) = next_cursor {
357            result["nextCursor"] = json!(c);
358        }
359        json!({"jsonrpc": "2.0", "id": 1, "result": result})
360    }
361
362    #[tokio::test]
363    async fn ingest__complete_page_creates_version_one() {
364        let m = manager();
365        let req = tools_list_req(None);
366        let resp = tools_list_resp(json!([{"name": "search"}]), None);
367        let v = m.ingest("tools/list", &req, &resp).await.unwrap();
368        assert_eq!(v.version, 1);
369        assert_eq!(v.method, "tools/list");
370        assert_eq!(v.upstream_id, "proxy-1");
371    }
372
373    #[tokio::test]
374    async fn ingest__first_page_buffers_returns_none() {
375        let m = manager();
376        let req = tools_list_req(None);
377        let resp = tools_list_resp(json!([{"name": "a"}]), Some("cur1"));
378        assert!(m.ingest("tools/list", &req, &resp).await.is_none());
379    }
380
381    #[tokio::test]
382    async fn ingest__first_middle_last_chain_merges_once() {
383        let m = manager();
384
385        let r1 = tools_list_resp(json!([{"name": "a"}]), Some("c1"));
386        assert!(
387            m.ingest("tools/list", &tools_list_req(None), &r1)
388                .await
389                .is_none()
390        );
391
392        let r2 = tools_list_resp(json!([{"name": "b"}]), Some("c2"));
393        assert!(
394            m.ingest("tools/list", &tools_list_req(Some("c1")), &r2)
395                .await
396                .is_none()
397        );
398
399        let r3 = tools_list_resp(json!([{"name": "c"}]), None);
400        let v = m
401            .ingest("tools/list", &tools_list_req(Some("c2")), &r3)
402            .await
403            .unwrap();
404
405        let names: Vec<&str> = v.payload["tools"]
406            .as_array()
407            .unwrap()
408            .iter()
409            .map(|t| t["name"].as_str().unwrap())
410            .collect();
411        assert_eq!(names, vec!["a", "b", "c"]);
412        assert_eq!(v.version, 1);
413    }
414
415    #[tokio::test]
416    async fn ingest__unchanged_payload_returns_none() {
417        let m = manager();
418        let req = tools_list_req(None);
419        let resp = tools_list_resp(json!([{"name": "a"}]), None);
420        m.ingest("tools/list", &req, &resp).await.unwrap();
421        assert!(m.ingest("tools/list", &req, &resp).await.is_none());
422    }
423
424    #[tokio::test]
425    async fn preload__seeds_hash_and_version_counter() {
426        // Simulates startup hydration: we hand the manager a v3 version
427        // that was persisted before a restart. The next ingest with the
428        // same content must not mint v4, and the next ingest with
429        // different content must mint v4 (not v1).
430        let m = manager();
431        let req = tools_list_req(None);
432        let stored = json!({"tools": [{"name": "a"}]});
433        let version = SchemaVersion {
434            id: SchemaVersionId("preload-seed-123".to_string()),
435            upstream_id: "proxy-1".to_string(),
436            method: "tools/list".to_string(),
437            version: 3,
438            payload: Arc::new(stored.clone()),
439            content_hash: hash_payload(&stored),
440            captured_at: Utc::now(),
441        };
442        m.preload(version).await;
443
444        // Same-content ingest: no new version.
445        let same = tools_list_resp(json!([{"name": "a"}]), None);
446        assert!(m.ingest("tools/list", &req, &same).await.is_none());
447
448        // Different content: increments to v4, not v1.
449        let changed = tools_list_resp(json!([{"name": "a"}, {"name": "b"}]), None);
450        let v4 = m.ingest("tools/list", &req, &changed).await.unwrap();
451        assert_eq!(v4.version, 4);
452    }
453
454    #[tokio::test]
455    async fn preload__idempotent_second_call_noop() {
456        let m = manager();
457        let stored = json!({"tools": [{"name": "a"}]});
458        let mk = |v: u32, tag: &str| SchemaVersion {
459            id: SchemaVersionId(format!("id-{tag}")),
460            upstream_id: "proxy-1".to_string(),
461            method: "tools/list".to_string(),
462            version: v,
463            payload: Arc::new(stored.clone()),
464            content_hash: format!("hash-{tag}"),
465            captured_at: Utc::now(),
466        };
467
468        m.preload(mk(3, "first")).await;
469        m.preload(mk(99, "second")).await;
470
471        // Second preload was skipped (state already had a hash), so
472        // the counter is 4, not 100.
473        let req = tools_list_req(None);
474        let changed = tools_list_resp(json!([{"name": "b"}]), None);
475        let v = m.ingest("tools/list", &req, &changed).await.unwrap();
476        assert_eq!(v.version, 4);
477    }
478
479    #[tokio::test]
480    async fn preload__makes_list_tools_visible_without_ingest() {
481        let m = manager();
482        let stored = json!({"tools": [{"name": "a"}, {"name": "b"}]});
483        let version = SchemaVersion {
484            id: SchemaVersionId("preload-list".to_string()),
485            upstream_id: "proxy-1".to_string(),
486            method: "tools/list".to_string(),
487            version: 1,
488            payload: Arc::new(stored.clone()),
489            content_hash: hash_payload(&stored),
490            captured_at: Utc::now(),
491        };
492        m.preload(version).await;
493
494        let tools = m.list_tools().await;
495        assert_eq!(tools.len(), 2);
496        assert_eq!(tools[0]["name"], "a");
497    }
498
499    #[tokio::test]
500    async fn ingest__volatile_meta_does_not_create_new_version() {
501        // Regression: dashboards saw 138 versions for a server whose tools
502        // hadn't changed in weeks, because the server regenerated `_meta`
503        // per request. Only the array of items should influence the hash.
504        let m = manager();
505        let req = tools_list_req(None);
506
507        let r1 = json!({
508            "jsonrpc": "2.0", "id": 1,
509            "result": {
510                "tools": [{"name": "a"}],
511                "_meta": {"requestId": "uuid-1"}
512            }
513        });
514        let r2 = json!({
515            "jsonrpc": "2.0", "id": 1,
516            "result": {
517                "tools": [{"name": "a"}],
518                "_meta": {"requestId": "uuid-2"}
519            }
520        });
521
522        let v1 = m.ingest("tools/list", &req, &r1).await.unwrap();
523        assert_eq!(v1.version, 1);
524        assert!(
525            m.ingest("tools/list", &req, &r2).await.is_none(),
526            "different _meta with identical tools must not mint a new version"
527        );
528    }
529
530    #[tokio::test]
531    async fn ingest__description_only_change_does_not_bump_version() {
532        // Regression: a 14-tool server logged 309 schema versions because
533        // upstream descriptions drifted between requests. The contract
534        // (name + inputSchema) is what matters; prose changes must not
535        // mint a new version.
536        let m = manager();
537        let req = tools_list_req(None);
538        let r1 = tools_list_resp(
539            json!([{"name": "a", "description": "old", "inputSchema": {"type": "object"}}]),
540            None,
541        );
542        let r2 = tools_list_resp(
543            json!([{"name": "a", "description": "new", "inputSchema": {"type": "object"}}]),
544            None,
545        );
546
547        let v1 = m.ingest("tools/list", &req, &r1).await.unwrap();
548        assert_eq!(v1.version, 1);
549        assert!(m.ingest("tools/list", &req, &r2).await.is_none());
550    }
551
552    #[tokio::test]
553    async fn ingest__reordering_only_does_not_bump_version() {
554        let m = manager();
555        let req = tools_list_req(None);
556        let r1 = tools_list_resp(json!([{"name": "a"}, {"name": "b"}, {"name": "c"}]), None);
557        let r2 = tools_list_resp(json!([{"name": "c"}, {"name": "a"}, {"name": "b"}]), None);
558
559        m.ingest("tools/list", &req, &r1).await.unwrap();
560        assert!(m.ingest("tools/list", &req, &r2).await.is_none());
561    }
562
563    #[tokio::test]
564    async fn ingest__input_schema_change_bumps_version() {
565        let m = manager();
566        let req = tools_list_req(None);
567        let r1 = tools_list_resp(
568            json!([{"name": "a", "inputSchema": {"type": "object"}}]),
569            None,
570        );
571        let r2 = tools_list_resp(
572            json!([{
573                "name": "a",
574                "inputSchema": {"type": "object", "properties": {"q": {"type": "string"}}}
575            }]),
576            None,
577        );
578
579        let v1 = m.ingest("tools/list", &req, &r1).await.unwrap();
580        let v2 = m.ingest("tools/list", &req, &r2).await.unwrap();
581        assert_eq!(v1.version, 1);
582        assert_eq!(v2.version, 2);
583    }
584
585    #[tokio::test]
586    async fn ingest__changed_payload_increments_version() {
587        let m = manager();
588        let req = tools_list_req(None);
589        let r1 = tools_list_resp(json!([{"name": "a"}]), None);
590        let v1 = m.ingest("tools/list", &req, &r1).await.unwrap();
591        assert_eq!(v1.version, 1);
592
593        let r2 = tools_list_resp(json!([{"name": "a"}, {"name": "b"}]), None);
594        let v2 = m.ingest("tools/list", &req, &r2).await.unwrap();
595        assert_eq!(v2.version, 2);
596    }
597
598    #[tokio::test]
599    async fn ingest__clears_stale_on_new_version() {
600        let m = manager();
601        let req = tools_list_req(None);
602        let r1 = tools_list_resp(json!([{"name": "a"}]), None);
603        m.ingest("tools/list", &req, &r1).await.unwrap();
604
605        m.mark_stale("tools/list");
606        assert!(m.is_stale("tools/list"));
607
608        let r2 = tools_list_resp(json!([{"name": "a"}, {"name": "b"}]), None);
609        m.ingest("tools/list", &req, &r2).await.unwrap();
610        assert!(!m.is_stale("tools/list"));
611    }
612
613    #[tokio::test]
614    async fn ingest__no_result_returns_none() {
615        let m = manager();
616        let req = tools_list_req(None);
617        let err_resp =
618            json!({"jsonrpc": "2.0", "id": 1, "error": {"code": -32603, "message": "x"}});
619        assert!(m.ingest("tools/list", &req, &err_resp).await.is_none());
620    }
621
622    #[tokio::test]
623    async fn mark_stale__and_is_stale_idempotent() {
624        let m = manager();
625        assert!(!m.is_stale("tools/list"));
626        m.mark_stale("tools/list");
627        let first = m.stale_since("tools/list");
628        m.mark_stale("tools/list");
629        let second = m.stale_since("tools/list");
630        assert!(m.is_stale("tools/list"));
631        assert_eq!(first, second);
632    }
633
634    #[tokio::test]
635    async fn list_tools__empty_when_no_version() {
636        let m = manager();
637        assert!(m.list_tools().await.is_empty());
638    }
639
640    #[tokio::test]
641    async fn list_tools__returns_items_from_latest() {
642        let m = manager();
643        let req = tools_list_req(None);
644        let resp = tools_list_resp(json!([{"name": "a"}, {"name": "b"}]), None);
645        m.ingest("tools/list", &req, &resp).await.unwrap();
646
647        let tools = m.list_tools().await;
648        assert_eq!(tools.len(), 2);
649        assert_eq!(tools[0]["name"], "a");
650        assert_eq!(tools[1]["name"], "b");
651    }
652
653    #[tokio::test]
654    async fn get_tool__by_name_hit_and_miss() {
655        let m = manager();
656        let req = tools_list_req(None);
657        let resp = tools_list_resp(json!([{"name": "search", "description": "find"}]), None);
658        m.ingest("tools/list", &req, &resp).await.unwrap();
659
660        let hit = m.get_tool("search").await.unwrap();
661        assert_eq!(hit["description"], "find");
662        assert!(m.get_tool("missing").await.is_none());
663    }
664
665    #[tokio::test]
666    async fn get_resource__by_uri() {
667        let m = manager();
668        let req = json!({"jsonrpc": "2.0", "id": 1, "method": "resources/list"});
669        let resp = json!({
670            "jsonrpc": "2.0", "id": 1,
671            "result": {"resources": [{"uri": "file://a", "name": "A"}]}
672        });
673        m.ingest("resources/list", &req, &resp).await.unwrap();
674        let r = m.get_resource("file://a").await.unwrap();
675        assert_eq!(r["name"], "A");
676    }
677
678    #[tokio::test]
679    async fn warm__seeds_counter_from_store() {
680        let store = MemorySchemaStore::new();
681        let pre = SchemaVersion {
682            id: SchemaVersionId("abc".to_string()),
683            upstream_id: "proxy-1".to_string(),
684            method: "tools/list".to_string(),
685            version: 5,
686            payload: Arc::new(json!({"tools": [{"name": "x"}]})),
687            content_hash: "prior-hash".to_string(),
688            captured_at: Utc::now(),
689        };
690        store.put_version(pre).await;
691
692        let m = SchemaManager::new("proxy-1", store);
693        let req = tools_list_req(None);
694        let resp = tools_list_resp(json!([{"name": "y"}]), None);
695        let v = m.ingest("tools/list", &req, &resp).await.unwrap();
696        assert_eq!(v.version, 6);
697    }
698
699    #[tokio::test]
700    async fn latest__returns_current_version() {
701        let m = manager();
702        let req = tools_list_req(None);
703        let resp = tools_list_resp(json!([{"name": "a"}]), None);
704        m.ingest("tools/list", &req, &resp).await.unwrap();
705        let latest = m.latest("tools/list").await.unwrap();
706        assert_eq!(latest.version, 1);
707    }
708
709    #[tokio::test]
710    async fn list_resource_templates__walks_template_key() {
711        let m = manager();
712        let req = json!({"jsonrpc": "2.0", "id": 1, "method": "resources/templates/list"});
713        let resp = json!({
714            "jsonrpc": "2.0", "id": 1,
715            "result": {"resourceTemplates": [{"uriTemplate": "file://{id}", "name": "f"}]}
716        });
717        m.ingest("resources/templates/list", &req, &resp)
718            .await
719            .unwrap();
720        let items = m.list_resource_templates().await;
721        assert_eq!(items.len(), 1);
722        assert_eq!(items[0]["name"], "f");
723    }
724
725    #[tokio::test]
726    async fn upstream_id__accessor() {
727        let m = manager();
728        assert_eq!(m.upstream_id(), "proxy-1");
729    }
730}