1use std::sync::Arc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11
12use chrono::{DateTime, Utc};
13use dashmap::DashMap;
14use serde_json::Value;
15use tokio::sync::Notify;
16
17use super::store::SchemaStore;
18use super::version::{SchemaVersion, SchemaVersionId, hash_payload};
19use crate::protocol::schema::{PageStatus, canonical_hash_view, detect_page_status, merge_pages};
20
21#[derive(Default)]
24struct PendingTracker {
25 count: AtomicUsize,
26 notify: Notify,
27}
28
29impl PendingTracker {
30 fn begin(&self) {
31 self.count.fetch_add(1, Ordering::SeqCst);
32 }
33 fn end(&self) {
34 if self.count.fetch_sub(1, Ordering::SeqCst) == 1 {
35 self.notify.notify_waiters();
36 }
37 }
38 async fn wait_idle(&self) {
39 while self.count.load(Ordering::SeqCst) > 0 {
40 let notified = self.notify.notified();
41 if self.count.load(Ordering::SeqCst) == 0 {
42 return;
43 }
44 notified.await;
45 }
46 }
47}
48
49#[derive(Default)]
54struct MethodState {
55 page_buffer: Vec<Value>,
56 current_hash: Option<String>,
57 next_version_number: u32,
58 stale: bool,
59 stale_since: Option<DateTime<Utc>>,
60}
61
62pub struct SchemaManager<S: SchemaStore> {
68 upstream_id: String,
69 store: S,
70 state: Arc<DashMap<String, MethodState>>,
71 pending: Arc<PendingTracker>,
72}
73
74impl<S: SchemaStore> SchemaManager<S> {
75 pub fn new(upstream_id: impl Into<String>, store: S) -> Self {
76 Self {
77 upstream_id: upstream_id.into(),
78 store,
79 state: Arc::new(DashMap::new()),
80 pending: Arc::new(PendingTracker::default()),
81 }
82 }
83
84 pub async fn wait_idle(&self) {
89 self.pending.wait_idle().await;
90 }
91
92 pub fn upstream_id(&self) -> &str {
93 &self.upstream_id
94 }
95
96 pub async fn warm(&self, method: &str) {
102 let latest = self
103 .store
104 .latest_version_for_method(&self.upstream_id, method)
105 .await;
106 if let Some(latest) = latest {
107 let mut entry = self.state.entry(method.to_string()).or_default();
108 if entry.current_hash.is_none() {
109 entry.current_hash = Some(latest.content_hash.clone());
110 entry.next_version_number = latest.version + 1;
111 }
112 }
113 }
114
115 pub async fn preload(&self, version: SchemaVersion) {
128 {
129 let mut entry = self.state.entry(version.method.clone()).or_default();
130 if entry.current_hash.is_some() {
131 return;
132 }
133 entry.current_hash = Some(version.content_hash.clone());
134 entry.next_version_number = version.version.saturating_add(1);
135 }
136 self.store.put_version(version).await;
137 }
138
139 pub async fn ingest(
149 &self,
150 method: &str,
151 request_body: &Value,
152 response_body: &Value,
153 ) -> Option<SchemaVersion> {
154 let result = response_body.get("result")?;
155 let status = detect_page_status(request_body, response_body);
156
157 let merged = {
158 let mut entry = self.state.entry(method.to_string()).or_default();
159 entry.page_buffer.push(result.clone());
160 match status {
161 PageStatus::Complete | PageStatus::LastPage => {
162 let pages = std::mem::take(&mut entry.page_buffer);
163 merge_pages(method, &pages)
164 .unwrap_or_else(|| pages.into_iter().next().unwrap_or(Value::Null))
165 }
166 PageStatus::FirstPage | PageStatus::MiddlePage => return None,
167 }
168 };
169
170 let hash = match canonical_hash_view(method, &merged) {
171 Some(view) => hash_payload(&view),
172 None => hash_payload(&merged),
173 };
174
175 let needs_warm = self
176 .state
177 .get(method)
178 .map(|e| e.current_hash.is_none() && e.next_version_number == 0)
179 .unwrap_or(true);
180 if needs_warm {
181 self.warm(method).await;
182 }
183
184 let (same, version_number) = {
185 let mut entry = self.state.entry(method.to_string()).or_default();
186 if entry.current_hash.as_deref() == Some(hash.as_str()) {
187 (true, 0)
188 } else {
189 let num = entry.next_version_number.max(1);
190 entry.current_hash = Some(hash.clone());
191 entry.next_version_number = num.saturating_add(1);
192 entry.stale = false;
193 entry.stale_since = None;
194 (false, num)
195 }
196 };
197
198 if same {
199 return None;
200 }
201
202 let id = SchemaVersionId(hash.chars().take(16).collect());
203 let version = SchemaVersion {
204 id,
205 upstream_id: self.upstream_id.clone(),
206 method: method.to_string(),
207 version: version_number,
208 payload: Arc::new(merged),
209 content_hash: hash,
210 captured_at: Utc::now(),
211 };
212 Some(self.store.put_version(version).await)
213 }
214
215 pub fn spawn_ingest<F>(
225 self: &Arc<Self>,
226 method: String,
227 request_body: Value,
228 response_body: Value,
229 on_version: F,
230 ) where
231 F: FnOnce(&SchemaVersion) + Send + 'static,
232 {
233 self.pending.begin();
234 let manager = Arc::clone(self);
235 tokio::spawn(async move {
236 let result = manager.ingest(&method, &request_body, &response_body).await;
237 if let Some(version) = result.as_ref() {
238 on_version(version);
239 }
240 manager.pending.end();
241 });
242 }
243
244 pub async fn latest(&self, method: &str) -> Option<SchemaVersion> {
247 self.store
248 .latest_version_for_method(&self.upstream_id, method)
249 .await
250 }
251
252 pub async fn list_tools(&self) -> Vec<Value> {
253 self.list_items("tools/list", "tools").await
254 }
255
256 pub async fn list_resources(&self) -> Vec<Value> {
257 self.list_items("resources/list", "resources").await
258 }
259
260 pub async fn list_resource_templates(&self) -> Vec<Value> {
261 self.list_items("resources/templates/list", "resourceTemplates")
262 .await
263 }
264
265 pub async fn list_prompts(&self) -> Vec<Value> {
266 self.list_items("prompts/list", "prompts").await
267 }
268
269 pub async fn get_tool(&self, name: &str) -> Option<Value> {
270 self.find_item_by_field("tools/list", "tools", "name", name)
271 .await
272 }
273
274 pub async fn get_resource(&self, uri: &str) -> Option<Value> {
275 self.find_item_by_field("resources/list", "resources", "uri", uri)
276 .await
277 }
278
279 pub async fn get_prompt(&self, name: &str) -> Option<Value> {
280 self.find_item_by_field("prompts/list", "prompts", "name", name)
281 .await
282 }
283
284 pub fn mark_stale(&self, method: &str) {
290 let mut entry = self.state.entry(method.to_string()).or_default();
291 if !entry.stale {
292 entry.stale = true;
293 entry.stale_since = Some(Utc::now());
294 }
295 }
296
297 pub fn is_stale(&self, method: &str) -> bool {
298 self.state.get(method).map(|e| e.stale).unwrap_or(false)
299 }
300
301 pub fn stale_since(&self, method: &str) -> Option<DateTime<Utc>> {
302 self.state.get(method).and_then(|e| e.stale_since)
303 }
304
305 async fn list_items(&self, method: &str, array_key: &str) -> Vec<Value> {
308 let Some(latest) = self.latest(method).await else {
309 return Vec::new();
310 };
311 latest
312 .payload
313 .get(array_key)
314 .and_then(|v| v.as_array())
315 .cloned()
316 .unwrap_or_default()
317 }
318
319 async fn find_item_by_field(
320 &self,
321 method: &str,
322 array_key: &str,
323 field: &str,
324 needle: &str,
325 ) -> Option<Value> {
326 let latest = self.latest(method).await?;
327 let arr = latest.payload.get(array_key).and_then(|v| v.as_array())?;
328 arr.iter()
329 .find(|item| item.get(field).and_then(|v| v.as_str()) == Some(needle))
330 .cloned()
331 }
332}
333
334#[cfg(test)]
335#[allow(non_snake_case)]
336mod tests {
337 use super::*;
338 use crate::protocol::schema_manager::store::MemorySchemaStore;
339 use serde_json::json;
340
341 fn manager() -> SchemaManager<MemorySchemaStore> {
342 SchemaManager::new("proxy-1", MemorySchemaStore::new())
343 }
344
345 fn tools_list_req(cursor: Option<&str>) -> Value {
346 match cursor {
347 Some(c) => {
348 json!({"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {"cursor": c}})
349 }
350 None => json!({"jsonrpc": "2.0", "id": 1, "method": "tools/list"}),
351 }
352 }
353
354 fn tools_list_resp(tools: Value, next_cursor: Option<&str>) -> Value {
355 let mut result = json!({"tools": tools});
356 if let Some(c) = next_cursor {
357 result["nextCursor"] = json!(c);
358 }
359 json!({"jsonrpc": "2.0", "id": 1, "result": result})
360 }
361
362 #[tokio::test]
363 async fn ingest__complete_page_creates_version_one() {
364 let m = manager();
365 let req = tools_list_req(None);
366 let resp = tools_list_resp(json!([{"name": "search"}]), None);
367 let v = m.ingest("tools/list", &req, &resp).await.unwrap();
368 assert_eq!(v.version, 1);
369 assert_eq!(v.method, "tools/list");
370 assert_eq!(v.upstream_id, "proxy-1");
371 }
372
373 #[tokio::test]
374 async fn ingest__first_page_buffers_returns_none() {
375 let m = manager();
376 let req = tools_list_req(None);
377 let resp = tools_list_resp(json!([{"name": "a"}]), Some("cur1"));
378 assert!(m.ingest("tools/list", &req, &resp).await.is_none());
379 }
380
381 #[tokio::test]
382 async fn ingest__first_middle_last_chain_merges_once() {
383 let m = manager();
384
385 let r1 = tools_list_resp(json!([{"name": "a"}]), Some("c1"));
386 assert!(
387 m.ingest("tools/list", &tools_list_req(None), &r1)
388 .await
389 .is_none()
390 );
391
392 let r2 = tools_list_resp(json!([{"name": "b"}]), Some("c2"));
393 assert!(
394 m.ingest("tools/list", &tools_list_req(Some("c1")), &r2)
395 .await
396 .is_none()
397 );
398
399 let r3 = tools_list_resp(json!([{"name": "c"}]), None);
400 let v = m
401 .ingest("tools/list", &tools_list_req(Some("c2")), &r3)
402 .await
403 .unwrap();
404
405 let names: Vec<&str> = v.payload["tools"]
406 .as_array()
407 .unwrap()
408 .iter()
409 .map(|t| t["name"].as_str().unwrap())
410 .collect();
411 assert_eq!(names, vec!["a", "b", "c"]);
412 assert_eq!(v.version, 1);
413 }
414
415 #[tokio::test]
416 async fn ingest__unchanged_payload_returns_none() {
417 let m = manager();
418 let req = tools_list_req(None);
419 let resp = tools_list_resp(json!([{"name": "a"}]), None);
420 m.ingest("tools/list", &req, &resp).await.unwrap();
421 assert!(m.ingest("tools/list", &req, &resp).await.is_none());
422 }
423
424 #[tokio::test]
425 async fn preload__seeds_hash_and_version_counter() {
426 let m = manager();
431 let req = tools_list_req(None);
432 let stored = json!({"tools": [{"name": "a"}]});
433 let version = SchemaVersion {
434 id: SchemaVersionId("preload-seed-123".to_string()),
435 upstream_id: "proxy-1".to_string(),
436 method: "tools/list".to_string(),
437 version: 3,
438 payload: Arc::new(stored.clone()),
439 content_hash: hash_payload(&stored),
440 captured_at: Utc::now(),
441 };
442 m.preload(version).await;
443
444 let same = tools_list_resp(json!([{"name": "a"}]), None);
446 assert!(m.ingest("tools/list", &req, &same).await.is_none());
447
448 let changed = tools_list_resp(json!([{"name": "a"}, {"name": "b"}]), None);
450 let v4 = m.ingest("tools/list", &req, &changed).await.unwrap();
451 assert_eq!(v4.version, 4);
452 }
453
454 #[tokio::test]
455 async fn preload__idempotent_second_call_noop() {
456 let m = manager();
457 let stored = json!({"tools": [{"name": "a"}]});
458 let mk = |v: u32, tag: &str| SchemaVersion {
459 id: SchemaVersionId(format!("id-{tag}")),
460 upstream_id: "proxy-1".to_string(),
461 method: "tools/list".to_string(),
462 version: v,
463 payload: Arc::new(stored.clone()),
464 content_hash: format!("hash-{tag}"),
465 captured_at: Utc::now(),
466 };
467
468 m.preload(mk(3, "first")).await;
469 m.preload(mk(99, "second")).await;
470
471 let req = tools_list_req(None);
474 let changed = tools_list_resp(json!([{"name": "b"}]), None);
475 let v = m.ingest("tools/list", &req, &changed).await.unwrap();
476 assert_eq!(v.version, 4);
477 }
478
479 #[tokio::test]
480 async fn preload__makes_list_tools_visible_without_ingest() {
481 let m = manager();
482 let stored = json!({"tools": [{"name": "a"}, {"name": "b"}]});
483 let version = SchemaVersion {
484 id: SchemaVersionId("preload-list".to_string()),
485 upstream_id: "proxy-1".to_string(),
486 method: "tools/list".to_string(),
487 version: 1,
488 payload: Arc::new(stored.clone()),
489 content_hash: hash_payload(&stored),
490 captured_at: Utc::now(),
491 };
492 m.preload(version).await;
493
494 let tools = m.list_tools().await;
495 assert_eq!(tools.len(), 2);
496 assert_eq!(tools[0]["name"], "a");
497 }
498
499 #[tokio::test]
500 async fn ingest__volatile_meta_does_not_create_new_version() {
501 let m = manager();
505 let req = tools_list_req(None);
506
507 let r1 = json!({
508 "jsonrpc": "2.0", "id": 1,
509 "result": {
510 "tools": [{"name": "a"}],
511 "_meta": {"requestId": "uuid-1"}
512 }
513 });
514 let r2 = json!({
515 "jsonrpc": "2.0", "id": 1,
516 "result": {
517 "tools": [{"name": "a"}],
518 "_meta": {"requestId": "uuid-2"}
519 }
520 });
521
522 let v1 = m.ingest("tools/list", &req, &r1).await.unwrap();
523 assert_eq!(v1.version, 1);
524 assert!(
525 m.ingest("tools/list", &req, &r2).await.is_none(),
526 "different _meta with identical tools must not mint a new version"
527 );
528 }
529
530 #[tokio::test]
531 async fn ingest__description_only_change_does_not_bump_version() {
532 let m = manager();
537 let req = tools_list_req(None);
538 let r1 = tools_list_resp(
539 json!([{"name": "a", "description": "old", "inputSchema": {"type": "object"}}]),
540 None,
541 );
542 let r2 = tools_list_resp(
543 json!([{"name": "a", "description": "new", "inputSchema": {"type": "object"}}]),
544 None,
545 );
546
547 let v1 = m.ingest("tools/list", &req, &r1).await.unwrap();
548 assert_eq!(v1.version, 1);
549 assert!(m.ingest("tools/list", &req, &r2).await.is_none());
550 }
551
552 #[tokio::test]
553 async fn ingest__reordering_only_does_not_bump_version() {
554 let m = manager();
555 let req = tools_list_req(None);
556 let r1 = tools_list_resp(json!([{"name": "a"}, {"name": "b"}, {"name": "c"}]), None);
557 let r2 = tools_list_resp(json!([{"name": "c"}, {"name": "a"}, {"name": "b"}]), None);
558
559 m.ingest("tools/list", &req, &r1).await.unwrap();
560 assert!(m.ingest("tools/list", &req, &r2).await.is_none());
561 }
562
563 #[tokio::test]
564 async fn ingest__input_schema_change_bumps_version() {
565 let m = manager();
566 let req = tools_list_req(None);
567 let r1 = tools_list_resp(
568 json!([{"name": "a", "inputSchema": {"type": "object"}}]),
569 None,
570 );
571 let r2 = tools_list_resp(
572 json!([{
573 "name": "a",
574 "inputSchema": {"type": "object", "properties": {"q": {"type": "string"}}}
575 }]),
576 None,
577 );
578
579 let v1 = m.ingest("tools/list", &req, &r1).await.unwrap();
580 let v2 = m.ingest("tools/list", &req, &r2).await.unwrap();
581 assert_eq!(v1.version, 1);
582 assert_eq!(v2.version, 2);
583 }
584
585 #[tokio::test]
586 async fn ingest__changed_payload_increments_version() {
587 let m = manager();
588 let req = tools_list_req(None);
589 let r1 = tools_list_resp(json!([{"name": "a"}]), None);
590 let v1 = m.ingest("tools/list", &req, &r1).await.unwrap();
591 assert_eq!(v1.version, 1);
592
593 let r2 = tools_list_resp(json!([{"name": "a"}, {"name": "b"}]), None);
594 let v2 = m.ingest("tools/list", &req, &r2).await.unwrap();
595 assert_eq!(v2.version, 2);
596 }
597
598 #[tokio::test]
599 async fn ingest__clears_stale_on_new_version() {
600 let m = manager();
601 let req = tools_list_req(None);
602 let r1 = tools_list_resp(json!([{"name": "a"}]), None);
603 m.ingest("tools/list", &req, &r1).await.unwrap();
604
605 m.mark_stale("tools/list");
606 assert!(m.is_stale("tools/list"));
607
608 let r2 = tools_list_resp(json!([{"name": "a"}, {"name": "b"}]), None);
609 m.ingest("tools/list", &req, &r2).await.unwrap();
610 assert!(!m.is_stale("tools/list"));
611 }
612
613 #[tokio::test]
614 async fn ingest__no_result_returns_none() {
615 let m = manager();
616 let req = tools_list_req(None);
617 let err_resp =
618 json!({"jsonrpc": "2.0", "id": 1, "error": {"code": -32603, "message": "x"}});
619 assert!(m.ingest("tools/list", &req, &err_resp).await.is_none());
620 }
621
622 #[tokio::test]
623 async fn mark_stale__and_is_stale_idempotent() {
624 let m = manager();
625 assert!(!m.is_stale("tools/list"));
626 m.mark_stale("tools/list");
627 let first = m.stale_since("tools/list");
628 m.mark_stale("tools/list");
629 let second = m.stale_since("tools/list");
630 assert!(m.is_stale("tools/list"));
631 assert_eq!(first, second);
632 }
633
634 #[tokio::test]
635 async fn list_tools__empty_when_no_version() {
636 let m = manager();
637 assert!(m.list_tools().await.is_empty());
638 }
639
640 #[tokio::test]
641 async fn list_tools__returns_items_from_latest() {
642 let m = manager();
643 let req = tools_list_req(None);
644 let resp = tools_list_resp(json!([{"name": "a"}, {"name": "b"}]), None);
645 m.ingest("tools/list", &req, &resp).await.unwrap();
646
647 let tools = m.list_tools().await;
648 assert_eq!(tools.len(), 2);
649 assert_eq!(tools[0]["name"], "a");
650 assert_eq!(tools[1]["name"], "b");
651 }
652
653 #[tokio::test]
654 async fn get_tool__by_name_hit_and_miss() {
655 let m = manager();
656 let req = tools_list_req(None);
657 let resp = tools_list_resp(json!([{"name": "search", "description": "find"}]), None);
658 m.ingest("tools/list", &req, &resp).await.unwrap();
659
660 let hit = m.get_tool("search").await.unwrap();
661 assert_eq!(hit["description"], "find");
662 assert!(m.get_tool("missing").await.is_none());
663 }
664
665 #[tokio::test]
666 async fn get_resource__by_uri() {
667 let m = manager();
668 let req = json!({"jsonrpc": "2.0", "id": 1, "method": "resources/list"});
669 let resp = json!({
670 "jsonrpc": "2.0", "id": 1,
671 "result": {"resources": [{"uri": "file://a", "name": "A"}]}
672 });
673 m.ingest("resources/list", &req, &resp).await.unwrap();
674 let r = m.get_resource("file://a").await.unwrap();
675 assert_eq!(r["name"], "A");
676 }
677
678 #[tokio::test]
679 async fn warm__seeds_counter_from_store() {
680 let store = MemorySchemaStore::new();
681 let pre = SchemaVersion {
682 id: SchemaVersionId("abc".to_string()),
683 upstream_id: "proxy-1".to_string(),
684 method: "tools/list".to_string(),
685 version: 5,
686 payload: Arc::new(json!({"tools": [{"name": "x"}]})),
687 content_hash: "prior-hash".to_string(),
688 captured_at: Utc::now(),
689 };
690 store.put_version(pre).await;
691
692 let m = SchemaManager::new("proxy-1", store);
693 let req = tools_list_req(None);
694 let resp = tools_list_resp(json!([{"name": "y"}]), None);
695 let v = m.ingest("tools/list", &req, &resp).await.unwrap();
696 assert_eq!(v.version, 6);
697 }
698
699 #[tokio::test]
700 async fn latest__returns_current_version() {
701 let m = manager();
702 let req = tools_list_req(None);
703 let resp = tools_list_resp(json!([{"name": "a"}]), None);
704 m.ingest("tools/list", &req, &resp).await.unwrap();
705 let latest = m.latest("tools/list").await.unwrap();
706 assert_eq!(latest.version, 1);
707 }
708
709 #[tokio::test]
710 async fn list_resource_templates__walks_template_key() {
711 let m = manager();
712 let req = json!({"jsonrpc": "2.0", "id": 1, "method": "resources/templates/list"});
713 let resp = json!({
714 "jsonrpc": "2.0", "id": 1,
715 "result": {"resourceTemplates": [{"uriTemplate": "file://{id}", "name": "f"}]}
716 });
717 m.ingest("resources/templates/list", &req, &resp)
718 .await
719 .unwrap();
720 let items = m.list_resource_templates().await;
721 assert_eq!(items.len(), 1);
722 assert_eq!(items[0]["name"], "f");
723 }
724
725 #[tokio::test]
726 async fn upstream_id__accessor() {
727 let m = manager();
728 assert_eq!(m.upstream_id(), "proxy-1");
729 }
730}