1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10use tandem_types::ToolResult;
11use tokio::process::{Child, Command};
12use tokio::sync::{Mutex, RwLock};
13
14const MCP_PROTOCOL_VERSION: &str = "2025-11-25";
15const MCP_CLIENT_NAME: &str = "tandem";
16const MCP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
17const MCP_AUTH_REPROBE_COOLDOWN_MS: u64 = 15_000;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct McpToolCacheEntry {
21 pub tool_name: String,
22 pub description: String,
23 #[serde(default)]
24 pub input_schema: Value,
25 pub fetched_at_ms: u64,
26 pub schema_hash: String,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct McpServer {
31 pub name: String,
32 pub transport: String,
33 #[serde(default = "default_enabled")]
34 pub enabled: bool,
35 pub connected: bool,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub pid: Option<u32>,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub last_error: Option<String>,
40 #[serde(default, skip_serializing_if = "Option::is_none")]
41 pub last_auth_challenge: Option<McpAuthChallenge>,
42 #[serde(default, skip_serializing_if = "Option::is_none")]
43 pub mcp_session_id: Option<String>,
44 #[serde(default)]
45 pub headers: HashMap<String, String>,
46 #[serde(default)]
47 pub tool_cache: Vec<McpToolCacheEntry>,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub tools_fetched_at_ms: Option<u64>,
50 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
51 pub pending_auth_by_tool: HashMap<String, PendingMcpAuth>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct McpAuthChallenge {
56 pub challenge_id: String,
57 pub tool_name: String,
58 pub authorization_url: String,
59 pub message: String,
60 pub requested_at_ms: u64,
61 pub status: String,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct PendingMcpAuth {
66 pub challenge_id: String,
67 pub authorization_url: String,
68 pub message: String,
69 pub status: String,
70 pub first_seen_ms: u64,
71 pub last_probe_ms: u64,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct McpRemoteTool {
76 pub server_name: String,
77 pub tool_name: String,
78 pub namespaced_name: String,
79 pub description: String,
80 #[serde(default)]
81 pub input_schema: Value,
82 pub fetched_at_ms: u64,
83 pub schema_hash: String,
84}
85
86#[derive(Clone)]
87pub struct McpRegistry {
88 servers: Arc<RwLock<HashMap<String, McpServer>>>,
89 processes: Arc<Mutex<HashMap<String, Child>>>,
90 state_file: Arc<PathBuf>,
91}
92
93impl McpRegistry {
94 pub fn new() -> Self {
95 Self::new_with_state_file(resolve_state_file())
96 }
97
98 pub fn new_with_state_file(state_file: PathBuf) -> Self {
99 let loaded = load_state(&state_file)
100 .into_iter()
101 .map(|(k, mut v)| {
102 v.connected = false;
103 v.pid = None;
104 if v.name.trim().is_empty() {
105 v.name = k.clone();
106 }
107 if v.headers.is_empty() {
108 v.headers = HashMap::new();
109 }
110 (k, v)
111 })
112 .collect::<HashMap<_, _>>();
113 Self {
114 servers: Arc::new(RwLock::new(loaded)),
115 processes: Arc::new(Mutex::new(HashMap::new())),
116 state_file: Arc::new(state_file),
117 }
118 }
119
120 pub async fn list(&self) -> HashMap<String, McpServer> {
121 self.servers.read().await.clone()
122 }
123
124 pub async fn add(&self, name: String, transport: String) {
125 self.add_or_update(name, transport, HashMap::new(), true)
126 .await;
127 }
128
129 pub async fn add_or_update(
130 &self,
131 name: String,
132 transport: String,
133 headers: HashMap<String, String>,
134 enabled: bool,
135 ) {
136 let mut servers = self.servers.write().await;
137 let existing = servers.get(&name).cloned();
138 let preserve_cache = existing
139 .as_ref()
140 .is_some_and(|row| row.transport == transport && row.headers == headers);
141 let existing_tool_cache = if preserve_cache {
142 existing
143 .as_ref()
144 .map(|row| row.tool_cache.clone())
145 .unwrap_or_default()
146 } else {
147 Vec::new()
148 };
149 let existing_fetched_at = if preserve_cache {
150 existing.as_ref().and_then(|row| row.tools_fetched_at_ms)
151 } else {
152 None
153 };
154 let server = McpServer {
155 name: name.clone(),
156 transport,
157 enabled,
158 connected: false,
159 pid: None,
160 last_error: None,
161 last_auth_challenge: None,
162 mcp_session_id: None,
163 headers,
164 tool_cache: existing_tool_cache,
165 tools_fetched_at_ms: existing_fetched_at,
166 pending_auth_by_tool: HashMap::new(),
167 };
168 servers.insert(name, server);
169 drop(servers);
170 self.persist_state().await;
171 }
172
173 pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
174 let mut servers = self.servers.write().await;
175 let Some(server) = servers.get_mut(name) else {
176 return false;
177 };
178 server.enabled = enabled;
179 if !enabled {
180 server.connected = false;
181 server.pid = None;
182 server.last_auth_challenge = None;
183 server.mcp_session_id = None;
184 server.pending_auth_by_tool.clear();
185 }
186 drop(servers);
187 if !enabled {
188 if let Some(mut child) = self.processes.lock().await.remove(name) {
189 let _ = child.kill().await;
190 let _ = child.wait().await;
191 }
192 }
193 self.persist_state().await;
194 true
195 }
196
197 pub async fn remove(&self, name: &str) -> bool {
198 let removed = {
199 let mut servers = self.servers.write().await;
200 servers.remove(name).is_some()
201 };
202 if !removed {
203 return false;
204 }
205
206 if let Some(mut child) = self.processes.lock().await.remove(name) {
207 let _ = child.kill().await;
208 let _ = child.wait().await;
209 }
210 self.persist_state().await;
211 true
212 }
213
214 pub async fn connect(&self, name: &str) -> bool {
215 let server = {
216 let servers = self.servers.read().await;
217 let Some(server) = servers.get(name) else {
218 return false;
219 };
220 server.clone()
221 };
222
223 if !server.enabled {
224 let mut servers = self.servers.write().await;
225 if let Some(entry) = servers.get_mut(name) {
226 entry.connected = false;
227 entry.pid = None;
228 entry.last_error = Some("MCP server is disabled".to_string());
229 entry.last_auth_challenge = None;
230 entry.mcp_session_id = None;
231 entry.pending_auth_by_tool.clear();
232 }
233 drop(servers);
234 self.persist_state().await;
235 return false;
236 }
237
238 if let Some(command_text) = parse_stdio_transport(&server.transport) {
239 return self.connect_stdio(name, command_text).await;
240 }
241
242 if parse_remote_endpoint(&server.transport).is_some() {
243 return self.refresh(name).await.is_ok();
244 }
245
246 let mut servers = self.servers.write().await;
247 if let Some(entry) = servers.get_mut(name) {
248 entry.connected = true;
249 entry.pid = None;
250 entry.last_error = None;
251 entry.last_auth_challenge = None;
252 entry.mcp_session_id = None;
253 entry.pending_auth_by_tool.clear();
254 }
255 drop(servers);
256 self.persist_state().await;
257 true
258 }
259
260 pub async fn refresh(&self, name: &str) -> Result<Vec<McpRemoteTool>, String> {
261 let server = {
262 let servers = self.servers.read().await;
263 let Some(server) = servers.get(name) else {
264 return Err("MCP server not found".to_string());
265 };
266 server.clone()
267 };
268
269 if !server.enabled {
270 return Err("MCP server is disabled".to_string());
271 }
272
273 let endpoint = parse_remote_endpoint(&server.transport)
274 .ok_or_else(|| "MCP refresh currently supports HTTP/S transports only".to_string())?;
275
276 let (tools, session_id) = match self.discover_remote_tools(&endpoint, &server.headers).await
277 {
278 Ok(result) => result,
279 Err(err) => {
280 let mut servers = self.servers.write().await;
281 if let Some(entry) = servers.get_mut(name) {
282 entry.connected = false;
283 entry.pid = None;
284 entry.last_error = Some(err.clone());
285 entry.last_auth_challenge = None;
286 entry.mcp_session_id = None;
287 entry.pending_auth_by_tool.clear();
288 entry.tool_cache.clear();
289 entry.tools_fetched_at_ms = None;
290 }
291 drop(servers);
292 self.persist_state().await;
293 return Err(err);
294 }
295 };
296
297 let now = now_ms();
298 let cache = tools
299 .iter()
300 .map(|tool| McpToolCacheEntry {
301 tool_name: tool.tool_name.clone(),
302 description: tool.description.clone(),
303 input_schema: tool.input_schema.clone(),
304 fetched_at_ms: now,
305 schema_hash: schema_hash(&tool.input_schema),
306 })
307 .collect::<Vec<_>>();
308
309 let mut servers = self.servers.write().await;
310 if let Some(entry) = servers.get_mut(name) {
311 entry.connected = true;
312 entry.pid = None;
313 entry.last_error = None;
314 entry.last_auth_challenge = None;
315 entry.mcp_session_id = session_id;
316 entry.tool_cache = cache;
317 entry.tools_fetched_at_ms = Some(now);
318 entry.pending_auth_by_tool.clear();
319 }
320 drop(servers);
321 self.persist_state().await;
322 Ok(self.server_tools(name).await)
323 }
324
325 pub async fn disconnect(&self, name: &str) -> bool {
326 if let Some(mut child) = self.processes.lock().await.remove(name) {
327 let _ = child.kill().await;
328 let _ = child.wait().await;
329 }
330 let mut servers = self.servers.write().await;
331 if let Some(server) = servers.get_mut(name) {
332 server.connected = false;
333 server.pid = None;
334 server.last_auth_challenge = None;
335 server.mcp_session_id = None;
336 server.pending_auth_by_tool.clear();
337 drop(servers);
338 self.persist_state().await;
339 return true;
340 }
341 false
342 }
343
344 pub async fn list_tools(&self) -> Vec<McpRemoteTool> {
345 let mut out = self
346 .servers
347 .read()
348 .await
349 .values()
350 .filter(|server| server.enabled && server.connected)
351 .flat_map(server_tool_rows)
352 .collect::<Vec<_>>();
353 out.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
354 out
355 }
356
357 pub async fn server_tools(&self, name: &str) -> Vec<McpRemoteTool> {
358 let Some(server) = self.servers.read().await.get(name).cloned() else {
359 return Vec::new();
360 };
361 let mut rows = server_tool_rows(&server);
362 rows.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
363 rows
364 }
365
366 pub async fn call_tool(
367 &self,
368 server_name: &str,
369 tool_name: &str,
370 args: Value,
371 ) -> Result<ToolResult, String> {
372 let server = {
373 let servers = self.servers.read().await;
374 let Some(server) = servers.get(server_name) else {
375 return Err(format!("MCP server '{server_name}' not found"));
376 };
377 server.clone()
378 };
379
380 if !server.enabled {
381 return Err(format!("MCP server '{server_name}' is disabled"));
382 }
383 if !server.connected {
384 return Err(format!("MCP server '{server_name}' is not connected"));
385 }
386
387 let endpoint = parse_remote_endpoint(&server.transport).ok_or_else(|| {
388 "MCP tools/call currently supports HTTP/S transports only".to_string()
389 })?;
390 let canonical_tool = canonical_tool_key(tool_name);
391 let now = now_ms();
392 if let Some(blocked) = pending_auth_short_circuit(
393 &server,
394 &canonical_tool,
395 tool_name,
396 now,
397 MCP_AUTH_REPROBE_COOLDOWN_MS,
398 ) {
399 return Ok(ToolResult {
400 output: blocked.output,
401 metadata: json!({
402 "server": server_name,
403 "tool": tool_name,
404 "result": Value::Null,
405 "mcpAuth": blocked.mcp_auth
406 }),
407 });
408 }
409 let normalized_args = normalize_mcp_tool_args(&server, tool_name, args);
410
411 {
412 let mut servers = self.servers.write().await;
413 if let Some(row) = servers.get_mut(server_name) {
414 if let Some(pending) = row.pending_auth_by_tool.get_mut(&canonical_tool) {
415 pending.last_probe_ms = now;
416 }
417 }
418 }
419
420 let request = json!({
421 "jsonrpc": "2.0",
422 "id": format!("call-{}-{}", server_name, now_ms()),
423 "method": "tools/call",
424 "params": {
425 "name": tool_name,
426 "arguments": normalized_args
427 }
428 });
429 let (response, session_id) = post_json_rpc_with_session(
430 &endpoint,
431 &server.headers,
432 request,
433 server.mcp_session_id.as_deref(),
434 )
435 .await?;
436 if session_id.is_some() {
437 let mut servers = self.servers.write().await;
438 if let Some(row) = servers.get_mut(server_name) {
439 row.mcp_session_id = session_id;
440 }
441 drop(servers);
442 self.persist_state().await;
443 }
444
445 if let Some(err) = response.get("error") {
446 if let Some(challenge) = extract_auth_challenge(err, tool_name) {
447 let output = format!(
448 "{}\n\nAuthorize here: {}",
449 challenge.message, challenge.authorization_url
450 );
451 {
452 let mut servers = self.servers.write().await;
453 if let Some(row) = servers.get_mut(server_name) {
454 row.last_auth_challenge = Some(challenge.clone());
455 row.last_error = None;
456 row.pending_auth_by_tool.insert(
457 canonical_tool.clone(),
458 pending_auth_from_challenge(&challenge),
459 );
460 }
461 }
462 self.persist_state().await;
463 return Ok(ToolResult {
464 output,
465 metadata: json!({
466 "server": server_name,
467 "tool": tool_name,
468 "result": Value::Null,
469 "mcpAuth": {
470 "required": true,
471 "challengeId": challenge.challenge_id,
472 "tool": challenge.tool_name,
473 "authorizationUrl": challenge.authorization_url,
474 "message": challenge.message,
475 "status": challenge.status
476 }
477 }),
478 });
479 }
480 let message = err
481 .get("message")
482 .and_then(|v| v.as_str())
483 .unwrap_or("MCP tools/call failed");
484 return Err(message.to_string());
485 }
486
487 let result = response.get("result").cloned().unwrap_or(Value::Null);
488 let auth_challenge = extract_auth_challenge(&result, tool_name);
489 let output = if let Some(challenge) = auth_challenge.as_ref() {
490 format!(
491 "{}\n\nAuthorize here: {}",
492 challenge.message, challenge.authorization_url
493 )
494 } else {
495 result
496 .get("content")
497 .map(render_mcp_content)
498 .or_else(|| result.get("output").map(|v| v.to_string()))
499 .unwrap_or_else(|| result.to_string())
500 };
501
502 {
503 let mut servers = self.servers.write().await;
504 if let Some(row) = servers.get_mut(server_name) {
505 row.last_auth_challenge = auth_challenge.clone();
506 if let Some(challenge) = auth_challenge.as_ref() {
507 row.pending_auth_by_tool.insert(
508 canonical_tool.clone(),
509 pending_auth_from_challenge(challenge),
510 );
511 } else {
512 row.pending_auth_by_tool.remove(&canonical_tool);
513 }
514 }
515 }
516 self.persist_state().await;
517
518 let auth_metadata = auth_challenge.as_ref().map(|challenge| {
519 json!({
520 "required": true,
521 "challengeId": challenge.challenge_id,
522 "tool": challenge.tool_name,
523 "authorizationUrl": challenge.authorization_url,
524 "message": challenge.message,
525 "status": challenge.status
526 })
527 });
528
529 Ok(ToolResult {
530 output,
531 metadata: json!({
532 "server": server_name,
533 "tool": tool_name,
534 "result": result,
535 "mcpAuth": auth_metadata
536 }),
537 })
538 }
539
540 async fn connect_stdio(&self, name: &str, command_text: &str) -> bool {
541 match spawn_stdio_process(command_text).await {
542 Ok(child) => {
543 let pid = child.id();
544 self.processes.lock().await.insert(name.to_string(), child);
545 let mut servers = self.servers.write().await;
546 if let Some(server) = servers.get_mut(name) {
547 server.connected = true;
548 server.pid = pid;
549 server.last_error = None;
550 server.last_auth_challenge = None;
551 server.pending_auth_by_tool.clear();
552 }
553 drop(servers);
554 self.persist_state().await;
555 true
556 }
557 Err(err) => {
558 let mut servers = self.servers.write().await;
559 if let Some(server) = servers.get_mut(name) {
560 server.connected = false;
561 server.pid = None;
562 server.last_error = Some(err);
563 server.last_auth_challenge = None;
564 server.pending_auth_by_tool.clear();
565 }
566 drop(servers);
567 self.persist_state().await;
568 false
569 }
570 }
571 }
572
573 async fn discover_remote_tools(
574 &self,
575 endpoint: &str,
576 headers: &HashMap<String, String>,
577 ) -> Result<(Vec<McpRemoteTool>, Option<String>), String> {
578 let initialize = json!({
579 "jsonrpc": "2.0",
580 "id": "initialize-1",
581 "method": "initialize",
582 "params": {
583 "protocolVersion": MCP_PROTOCOL_VERSION,
584 "capabilities": {},
585 "clientInfo": {
586 "name": MCP_CLIENT_NAME,
587 "version": MCP_CLIENT_VERSION,
588 }
589 }
590 });
591 let (init_response, mut session_id) =
592 post_json_rpc_with_session(endpoint, headers, initialize, None).await?;
593 if let Some(err) = init_response.get("error") {
594 let message = err
595 .get("message")
596 .and_then(|v| v.as_str())
597 .unwrap_or("MCP initialize failed");
598 return Err(message.to_string());
599 }
600
601 let tools_list = json!({
602 "jsonrpc": "2.0",
603 "id": "tools-list-1",
604 "method": "tools/list",
605 "params": {}
606 });
607 let (tools_response, next_session_id) =
608 post_json_rpc_with_session(endpoint, headers, tools_list, session_id.as_deref())
609 .await?;
610 if next_session_id.is_some() {
611 session_id = next_session_id;
612 }
613 if let Some(err) = tools_response.get("error") {
614 let message = err
615 .get("message")
616 .and_then(|v| v.as_str())
617 .unwrap_or("MCP tools/list failed");
618 return Err(message.to_string());
619 }
620
621 let tools = tools_response
622 .get("result")
623 .and_then(|v| v.get("tools"))
624 .and_then(|v| v.as_array())
625 .ok_or_else(|| "MCP tools/list result missing tools array".to_string())?;
626
627 let now = now_ms();
628 let mut out = Vec::new();
629 for row in tools {
630 let Some(tool_name) = row.get("name").and_then(|v| v.as_str()) else {
631 continue;
632 };
633 let description = row
634 .get("description")
635 .and_then(|v| v.as_str())
636 .unwrap_or("")
637 .to_string();
638 let mut input_schema = row
639 .get("inputSchema")
640 .or_else(|| row.get("input_schema"))
641 .cloned()
642 .unwrap_or_else(|| json!({"type":"object"}));
643 normalize_tool_input_schema(&mut input_schema);
644 out.push(McpRemoteTool {
645 server_name: String::new(),
646 tool_name: tool_name.to_string(),
647 namespaced_name: String::new(),
648 description,
649 input_schema,
650 fetched_at_ms: now,
651 schema_hash: String::new(),
652 });
653 }
654
655 Ok((out, session_id))
656 }
657
658 async fn persist_state(&self) {
659 let snapshot = self.servers.read().await.clone();
660 if let Some(parent) = self.state_file.parent() {
661 let _ = tokio::fs::create_dir_all(parent).await;
662 }
663 if let Ok(payload) = serde_json::to_string_pretty(&snapshot) {
664 let _ = tokio::fs::write(self.state_file.as_path(), payload).await;
665 }
666 }
667}
668
669impl Default for McpRegistry {
670 fn default() -> Self {
671 Self::new()
672 }
673}
674
675fn default_enabled() -> bool {
676 true
677}
678
679fn resolve_state_file() -> PathBuf {
680 if let Ok(path) = std::env::var("TANDEM_MCP_REGISTRY") {
681 return PathBuf::from(path);
682 }
683 if let Ok(state_dir) = std::env::var("TANDEM_STATE_DIR") {
684 let trimmed = state_dir.trim();
685 if !trimmed.is_empty() {
686 return PathBuf::from(trimmed).join("mcp_servers.json");
687 }
688 }
689 if let Some(data_dir) = dirs::data_dir() {
690 return data_dir
691 .join("tandem")
692 .join("data")
693 .join("mcp_servers.json");
694 }
695 dirs::home_dir()
696 .map(|home| home.join(".tandem").join("data").join("mcp_servers.json"))
697 .unwrap_or_else(|| PathBuf::from("mcp_servers.json"))
698}
699
700fn load_state(path: &Path) -> HashMap<String, McpServer> {
701 let Ok(raw) = std::fs::read_to_string(path) else {
702 return HashMap::new();
703 };
704 serde_json::from_str::<HashMap<String, McpServer>>(&raw).unwrap_or_default()
705}
706
707fn parse_stdio_transport(transport: &str) -> Option<&str> {
708 transport.strip_prefix("stdio:").map(str::trim)
709}
710
711fn parse_remote_endpoint(transport: &str) -> Option<String> {
712 let trimmed = transport.trim();
713 if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
714 return Some(trimmed.to_string());
715 }
716 for prefix in ["http:", "https:"] {
717 if let Some(rest) = trimmed.strip_prefix(prefix) {
718 let endpoint = rest.trim();
719 if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
720 return Some(endpoint.to_string());
721 }
722 }
723 }
724 None
725}
726
727fn server_tool_rows(server: &McpServer) -> Vec<McpRemoteTool> {
728 let server_slug = sanitize_namespace_segment(&server.name);
729 server
730 .tool_cache
731 .iter()
732 .map(|tool| {
733 let tool_slug = sanitize_namespace_segment(&tool.tool_name);
734 McpRemoteTool {
735 server_name: server.name.clone(),
736 tool_name: tool.tool_name.clone(),
737 namespaced_name: format!("mcp.{server_slug}.{tool_slug}"),
738 description: tool.description.clone(),
739 input_schema: tool.input_schema.clone(),
740 fetched_at_ms: tool.fetched_at_ms,
741 schema_hash: tool.schema_hash.clone(),
742 }
743 })
744 .collect()
745}
746
747fn sanitize_namespace_segment(raw: &str) -> String {
748 let mut out = String::new();
749 let mut previous_underscore = false;
750 for ch in raw.trim().chars() {
751 if ch.is_ascii_alphanumeric() {
752 out.push(ch.to_ascii_lowercase());
753 previous_underscore = false;
754 } else if !previous_underscore {
755 out.push('_');
756 previous_underscore = true;
757 }
758 }
759 let cleaned = out.trim_matches('_');
760 if cleaned.is_empty() {
761 "tool".to_string()
762 } else {
763 cleaned.to_string()
764 }
765}
766
767fn schema_hash(schema: &Value) -> String {
768 let payload = serde_json::to_vec(schema).unwrap_or_default();
769 let mut hasher = Sha256::new();
770 hasher.update(payload);
771 format!("{:x}", hasher.finalize())
772}
773
774fn extract_auth_challenge(result: &Value, tool_name: &str) -> Option<McpAuthChallenge> {
775 let authorization_url = find_string_with_priority(
776 result,
777 &[
778 &["structuredContent", "authorization_url"],
779 &["structuredContent", "authorizationUrl"],
780 &["authorization_url"],
781 &["authorizationUrl"],
782 &["auth_url"],
783 ],
784 &["authorization_url", "authorizationUrl", "auth_url"],
785 )?;
786 let raw_message = find_string_with_priority(
787 result,
788 &[
789 &["structuredContent", "message"],
790 &["message"],
791 &["structuredContent", "text"],
792 &["text"],
793 &["llm_instructions"],
794 ],
795 &["message", "text", "llm_instructions"],
796 )
797 .unwrap_or_else(|| "This tool requires authorization before it can run.".to_string());
798 let message = sanitize_auth_message(&raw_message);
799 let challenge_id = stable_id_seed(&format!("{tool_name}:{authorization_url}"));
800 Some(McpAuthChallenge {
801 challenge_id,
802 tool_name: tool_name.to_string(),
803 authorization_url,
804 message,
805 requested_at_ms: now_ms(),
806 status: "pending".to_string(),
807 })
808}
809
810fn find_string_by_any_key(value: &Value, keys: &[&str]) -> Option<String> {
811 match value {
812 Value::Object(map) => {
813 for key in keys {
814 if let Some(s) = map.get(*key).and_then(|v| v.as_str()) {
815 let trimmed = s.trim();
816 if !trimmed.is_empty() {
817 return Some(trimmed.to_string());
818 }
819 }
820 }
821 for child in map.values() {
822 if let Some(found) = find_string_by_any_key(child, keys) {
823 return Some(found);
824 }
825 }
826 None
827 }
828 Value::Array(items) => items
829 .iter()
830 .find_map(|item| find_string_by_any_key(item, keys)),
831 _ => None,
832 }
833}
834
835fn find_string_with_priority(
836 value: &Value,
837 paths: &[&[&str]],
838 fallback_keys: &[&str],
839) -> Option<String> {
840 for path in paths {
841 if let Some(found) = find_string_at_path(value, path) {
842 return Some(found);
843 }
844 }
845 find_string_by_any_key(value, fallback_keys)
846}
847
848fn find_string_at_path(value: &Value, path: &[&str]) -> Option<String> {
849 let mut current = value;
850 for segment in path {
851 current = current.get(*segment)?;
852 }
853 let s = current.as_str()?.trim();
854 if s.is_empty() {
855 None
856 } else {
857 Some(s.to_string())
858 }
859}
860
861fn sanitize_auth_message(raw: &str) -> String {
862 let trimmed = raw.trim();
863 if trimmed.is_empty() {
864 return "This tool requires authorization before it can run.".to_string();
865 }
866 if let Some((head, _)) = trimmed.split_once("Authorize here:") {
867 let head = head.trim();
868 if !head.is_empty() {
869 return truncate_text(head, 280);
870 }
871 }
872 let no_newlines = trimmed.replace(['\r', '\n'], " ");
873 truncate_text(no_newlines.trim(), 280)
874}
875
876fn truncate_text(input: &str, max_chars: usize) -> String {
877 if input.chars().count() <= max_chars {
878 return input.to_string();
879 }
880 let truncated = input.chars().take(max_chars).collect::<String>();
881 format!("{truncated}...")
882}
883
884fn stable_id_seed(seed: &str) -> String {
885 let mut hasher = Sha256::new();
886 hasher.update(seed.as_bytes());
887 let encoded = format!("{:x}", hasher.finalize());
888 encoded.chars().take(16).collect()
889}
890
891fn canonical_tool_key(tool_name: &str) -> String {
892 tool_name.trim().to_ascii_lowercase()
893}
894
895fn pending_auth_from_challenge(challenge: &McpAuthChallenge) -> PendingMcpAuth {
896 PendingMcpAuth {
897 challenge_id: challenge.challenge_id.clone(),
898 authorization_url: challenge.authorization_url.clone(),
899 message: challenge.message.clone(),
900 status: challenge.status.clone(),
901 first_seen_ms: challenge.requested_at_ms,
902 last_probe_ms: challenge.requested_at_ms,
903 }
904}
905
906struct PendingAuthShortCircuit {
907 output: String,
908 mcp_auth: Value,
909}
910
911fn pending_auth_short_circuit(
912 server: &McpServer,
913 tool_key: &str,
914 tool_name: &str,
915 now_ms_value: u64,
916 cooldown_ms: u64,
917) -> Option<PendingAuthShortCircuit> {
918 let pending = server.pending_auth_by_tool.get(tool_key)?;
919 let elapsed = now_ms_value.saturating_sub(pending.last_probe_ms);
920 if elapsed >= cooldown_ms {
921 return None;
922 }
923 let retry_after_ms = cooldown_ms.saturating_sub(elapsed);
924 let output = format!(
925 "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
926 tool_name,
927 pending.message,
928 pending.authorization_url,
929 retry_after_ms.div_ceil(1000)
930 );
931 Some(PendingAuthShortCircuit {
932 output,
933 mcp_auth: json!({
934 "required": true,
935 "pending": true,
936 "blocked": true,
937 "retryAfterMs": retry_after_ms,
938 "challengeId": pending.challenge_id,
939 "tool": tool_name,
940 "authorizationUrl": pending.authorization_url,
941 "message": pending.message,
942 "status": pending.status
943 }),
944 })
945}
946
947fn normalize_tool_input_schema(schema: &mut Value) {
948 normalize_schema_node(schema);
949}
950
951fn normalize_schema_node(node: &mut Value) {
952 let Some(obj) = node.as_object_mut() else {
953 return;
954 };
955
956 if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
960 let all_strings = enum_values.iter().all(|v| v.is_string());
961 let string_like_type = schema_type_allows_string_enum(obj.get("type"));
962 if !all_strings || !string_like_type {
963 obj.remove("enum");
964 }
965 }
966
967 if let Some(properties) = obj.get_mut("properties").and_then(|v| v.as_object_mut()) {
968 for value in properties.values_mut() {
969 normalize_schema_node(value);
970 }
971 }
972
973 if let Some(items) = obj.get_mut("items") {
974 normalize_schema_node(items);
975 }
976
977 for key in ["anyOf", "oneOf", "allOf"] {
978 if let Some(array) = obj.get_mut(key).and_then(|v| v.as_array_mut()) {
979 for child in array.iter_mut() {
980 normalize_schema_node(child);
981 }
982 }
983 }
984
985 if let Some(additional) = obj.get_mut("additionalProperties") {
986 normalize_schema_node(additional);
987 }
988}
989
990fn schema_type_allows_string_enum(schema_type: Option<&Value>) -> bool {
991 let Some(schema_type) = schema_type else {
992 return true;
994 };
995
996 if let Some(kind) = schema_type.as_str() {
997 return kind == "string";
998 }
999
1000 if let Some(kinds) = schema_type.as_array() {
1001 let mut saw_string = false;
1002 for kind in kinds {
1003 let Some(kind) = kind.as_str() else {
1004 return false;
1005 };
1006 if kind == "string" {
1007 saw_string = true;
1008 continue;
1009 }
1010 if kind != "null" {
1011 return false;
1012 }
1013 }
1014 return saw_string;
1015 }
1016
1017 false
1018}
1019
1020fn now_ms() -> u64 {
1021 SystemTime::now()
1022 .duration_since(UNIX_EPOCH)
1023 .map(|d| d.as_millis() as u64)
1024 .unwrap_or(0)
1025}
1026
1027fn build_headers(headers: &HashMap<String, String>) -> Result<HeaderMap, String> {
1028 let mut map = HeaderMap::new();
1029 map.insert(
1030 ACCEPT,
1031 HeaderValue::from_static("application/json, text/event-stream"),
1032 );
1033 map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
1034 for (key, value) in headers {
1035 let name = HeaderName::from_bytes(key.trim().as_bytes())
1036 .map_err(|e| format!("Invalid header name '{key}': {e}"))?;
1037 let header = HeaderValue::from_str(value.trim())
1038 .map_err(|e| format!("Invalid header value for '{key}': {e}"))?;
1039 map.insert(name, header);
1040 }
1041 Ok(map)
1042}
1043
1044async fn post_json_rpc_with_session(
1045 endpoint: &str,
1046 headers: &HashMap<String, String>,
1047 request: Value,
1048 session_id: Option<&str>,
1049) -> Result<(Value, Option<String>), String> {
1050 let client = reqwest::Client::builder()
1051 .timeout(std::time::Duration::from_secs(12))
1052 .build()
1053 .map_err(|e| format!("Failed to build HTTP client: {e}"))?;
1054 let mut req = client.post(endpoint).headers(build_headers(headers)?);
1055 if let Some(id) = session_id {
1056 let trimmed = id.trim();
1057 if !trimmed.is_empty() {
1058 req = req.header("Mcp-Session-Id", trimmed);
1059 }
1060 }
1061 let response = req
1062 .json(&request)
1063 .send()
1064 .await
1065 .map_err(|e| format!("MCP request failed: {e}"))?;
1066 let response_session_id = response
1067 .headers()
1068 .get("mcp-session-id")
1069 .and_then(|v| v.to_str().ok())
1070 .map(|v| v.trim().to_string())
1071 .filter(|v| !v.is_empty());
1072 let status = response.status();
1073 let payload = response
1074 .text()
1075 .await
1076 .map_err(|e| format!("Failed to read MCP response: {e}"))?;
1077 if !status.is_success() {
1078 return Err(format!(
1079 "MCP endpoint returned HTTP {}: {}",
1080 status.as_u16(),
1081 payload.chars().take(400).collect::<String>()
1082 ));
1083 }
1084 let value = serde_json::from_str::<Value>(&payload)
1085 .map_err(|e| format!("Invalid MCP JSON response: {e}"))?;
1086 Ok((value, response_session_id))
1087}
1088
1089fn render_mcp_content(value: &Value) -> String {
1090 let Some(items) = value.as_array() else {
1091 return value.to_string();
1092 };
1093 let mut chunks = Vec::new();
1094 for item in items {
1095 if let Some(text) = item.get("text").and_then(|v| v.as_str()) {
1096 chunks.push(text.to_string());
1097 continue;
1098 }
1099 chunks.push(item.to_string());
1100 }
1101 if chunks.is_empty() {
1102 value.to_string()
1103 } else {
1104 chunks.join("\n")
1105 }
1106}
1107
1108fn normalize_mcp_tool_args(server: &McpServer, tool_name: &str, raw_args: Value) -> Value {
1109 let Some(schema) = server
1110 .tool_cache
1111 .iter()
1112 .find(|row| row.tool_name.eq_ignore_ascii_case(tool_name))
1113 .map(|row| &row.input_schema)
1114 else {
1115 return raw_args;
1116 };
1117
1118 let mut args_obj = match raw_args {
1119 Value::Object(obj) => obj,
1120 other => return other,
1121 };
1122
1123 let properties = schema
1124 .get("properties")
1125 .and_then(|v| v.as_object())
1126 .cloned()
1127 .unwrap_or_default();
1128 if properties.is_empty() {
1129 return Value::Object(args_obj);
1130 }
1131
1132 let mut normalized_existing: HashMap<String, String> = HashMap::new();
1134 for key in args_obj.keys() {
1135 normalized_existing.insert(normalize_arg_key(key), key.clone());
1136 }
1137
1138 let canonical_keys = properties.keys().cloned().collect::<Vec<_>>();
1140 for canonical in &canonical_keys {
1141 if args_obj.contains_key(canonical) {
1142 continue;
1143 }
1144 if let Some(existing_key) = normalized_existing.get(&normalize_arg_key(canonical)) {
1145 if let Some(value) = args_obj.get(existing_key).cloned() {
1146 args_obj.insert(canonical.clone(), value);
1147 }
1148 }
1149 }
1150
1151 let required = schema
1153 .get("required")
1154 .and_then(|v| v.as_array())
1155 .map(|arr| {
1156 arr.iter()
1157 .filter_map(|v| v.as_str().map(str::to_string))
1158 .collect::<Vec<_>>()
1159 })
1160 .unwrap_or_default();
1161
1162 for required_key in required {
1163 if args_obj.contains_key(&required_key) {
1164 continue;
1165 }
1166 if let Some(alias_value) = find_required_alias_value(&required_key, &args_obj) {
1167 args_obj.insert(required_key, alias_value);
1168 }
1169 }
1170
1171 Value::Object(args_obj)
1172}
1173
1174fn find_required_alias_value(
1175 required_key: &str,
1176 args_obj: &serde_json::Map<String, Value>,
1177) -> Option<Value> {
1178 let mut alias_candidates = vec![
1179 required_key.to_string(),
1180 required_key.to_ascii_lowercase(),
1181 required_key.replace('_', ""),
1182 ];
1183
1184 if required_key.contains("title") {
1186 alias_candidates.extend([
1187 "name".to_string(),
1188 "title".to_string(),
1189 "task_name".to_string(),
1190 "taskname".to_string(),
1191 ]);
1192 }
1193
1194 if let Some(base) = required_key.strip_suffix("_id") {
1196 alias_candidates.extend([base.to_string(), format!("{base}id"), format!("{base}_id")]);
1197 }
1198
1199 let mut by_normalized: HashMap<String, &Value> = HashMap::new();
1200 for (key, value) in args_obj {
1201 by_normalized.insert(normalize_arg_key(key), value);
1202 }
1203
1204 alias_candidates
1205 .into_iter()
1206 .find_map(|candidate| by_normalized.get(&normalize_arg_key(&candidate)).cloned())
1207 .cloned()
1208}
1209
1210fn normalize_arg_key(key: &str) -> String {
1211 key.chars()
1212 .filter(|ch| ch.is_ascii_alphanumeric())
1213 .map(|ch| ch.to_ascii_lowercase())
1214 .collect()
1215}
1216
1217async fn spawn_stdio_process(command_text: &str) -> Result<Child, String> {
1218 if command_text.is_empty() {
1219 return Err("Missing stdio command".to_string());
1220 }
1221 #[cfg(windows)]
1222 let mut command = {
1223 let mut cmd = Command::new("powershell");
1224 cmd.args(["-NoProfile", "-Command", command_text]);
1225 cmd
1226 };
1227 #[cfg(not(windows))]
1228 let mut command = {
1229 let mut cmd = Command::new("sh");
1230 cmd.args(["-lc", command_text]);
1231 cmd
1232 };
1233 command
1234 .stdin(std::process::Stdio::null())
1235 .stdout(std::process::Stdio::null())
1236 .stderr(std::process::Stdio::null());
1237 command.spawn().map_err(|e| e.to_string())
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use super::*;
1243 use uuid::Uuid;
1244
1245 #[tokio::test]
1246 async fn add_connect_disconnect_non_stdio_server() {
1247 let file = std::env::temp_dir().join(format!("mcp-test-{}.json", Uuid::new_v4()));
1248 let registry = McpRegistry::new_with_state_file(file);
1249 registry
1250 .add("example".to_string(), "sse:https://example.com".to_string())
1251 .await;
1252 assert!(registry.connect("example").await);
1253 let listed = registry.list().await;
1254 assert!(listed.get("example").map(|s| s.connected).unwrap_or(false));
1255 assert!(registry.disconnect("example").await);
1256 }
1257
1258 #[test]
1259 fn parse_remote_endpoint_supports_http_prefixes() {
1260 assert_eq!(
1261 parse_remote_endpoint("https://mcp.example.com/mcp"),
1262 Some("https://mcp.example.com/mcp".to_string())
1263 );
1264 assert_eq!(
1265 parse_remote_endpoint("http:https://mcp.example.com/mcp"),
1266 Some("https://mcp.example.com/mcp".to_string())
1267 );
1268 }
1269
1270 #[test]
1271 fn normalize_schema_removes_non_string_enums_recursively() {
1272 let mut schema = json!({
1273 "type": "object",
1274 "properties": {
1275 "good": { "type": "string", "enum": ["a", "b"] },
1276 "good_nullable": { "type": ["string", "null"], "enum": ["asc", "desc"] },
1277 "bad_object": { "type": "object", "enum": ["asc", "desc"] },
1278 "bad_array": { "type": "array", "enum": ["asc", "desc"] },
1279 "bad_number": { "type": "number", "enum": [1, 2] },
1280 "bad_mixed": { "enum": ["ok", 1] },
1281 "nested": {
1282 "type": "object",
1283 "properties": {
1284 "child": { "enum": [true, false] }
1285 }
1286 }
1287 }
1288 });
1289
1290 normalize_tool_input_schema(&mut schema);
1291
1292 assert!(
1293 schema["properties"]["good"]["enum"].is_array(),
1294 "string enums should be preserved"
1295 );
1296 assert!(
1297 schema["properties"]["good_nullable"]["enum"].is_array(),
1298 "string|null enums should be preserved"
1299 );
1300 assert!(
1301 schema["properties"]["bad_object"]["enum"].is_null(),
1302 "object enums should be dropped"
1303 );
1304 assert!(
1305 schema["properties"]["bad_array"]["enum"].is_null(),
1306 "array enums should be dropped"
1307 );
1308 assert!(
1309 schema["properties"]["bad_number"]["enum"].is_null(),
1310 "non-string enums should be dropped"
1311 );
1312 assert!(
1313 schema["properties"]["bad_mixed"]["enum"].is_null(),
1314 "mixed enums should be dropped"
1315 );
1316 assert!(
1317 schema["properties"]["nested"]["properties"]["child"]["enum"].is_null(),
1318 "recursive non-string enums should be dropped"
1319 );
1320 }
1321
1322 #[test]
1323 fn extract_auth_challenge_from_result_payload() {
1324 let payload = json!({
1325 "content": [
1326 {
1327 "type": "text",
1328 "llm_instructions": "Authorize Gmail access first.",
1329 "authorization_url": "https://example.com/oauth/start"
1330 }
1331 ]
1332 });
1333 let challenge = extract_auth_challenge(&payload, "gmail_whoami")
1334 .expect("auth challenge should be detected");
1335 assert_eq!(challenge.tool_name, "gmail_whoami");
1336 assert_eq!(
1337 challenge.authorization_url,
1338 "https://example.com/oauth/start"
1339 );
1340 assert_eq!(challenge.status, "pending");
1341 }
1342
1343 #[test]
1344 fn extract_auth_challenge_returns_none_without_url() {
1345 let payload = json!({
1346 "content": [
1347 {"type":"text","text":"No authorization needed"}
1348 ]
1349 });
1350 assert!(extract_auth_challenge(&payload, "gmail_whoami").is_none());
1351 }
1352
1353 #[test]
1354 fn extract_auth_challenge_prefers_structured_content_message() {
1355 let payload = json!({
1356 "content": [
1357 {
1358 "type": "text",
1359 "text": "{\"authorization_url\":\"https://example.com/oauth\",\"message\":\"json blob\"}"
1360 }
1361 ],
1362 "structuredContent": {
1363 "authorization_url": "https://example.com/oauth",
1364 "message": "Authorize Reddit access first."
1365 }
1366 });
1367 let challenge = extract_auth_challenge(&payload, "reddit_getmyusername")
1368 .expect("auth challenge should be detected");
1369 assert_eq!(challenge.message, "Authorize Reddit access first.");
1370 }
1371
1372 #[test]
1373 fn sanitize_auth_message_compacts_llm_instructions() {
1374 let raw = "Please show the following link to the end user formatted as markdown: https://example.com/auth\nInform the end user that this tool requires authorization.";
1375 let message = sanitize_auth_message(raw);
1376 assert!(!message.contains('\n'));
1377 assert!(message.len() <= 283);
1378 }
1379
1380 #[test]
1381 fn normalize_mcp_tool_args_maps_clickup_aliases() {
1382 let server = McpServer {
1383 name: "arcade".to_string(),
1384 transport: "https://example.com/mcp".to_string(),
1385 enabled: true,
1386 connected: true,
1387 pid: None,
1388 last_error: None,
1389 last_auth_challenge: None,
1390 mcp_session_id: None,
1391 headers: HashMap::new(),
1392 tool_cache: vec![McpToolCacheEntry {
1393 tool_name: "Clickup_CreateTask".to_string(),
1394 description: "Create task".to_string(),
1395 input_schema: json!({
1396 "type":"object",
1397 "properties":{
1398 "list_id":{"type":"string"},
1399 "task_title":{"type":"string"}
1400 },
1401 "required":["list_id","task_title"]
1402 }),
1403 fetched_at_ms: 0,
1404 schema_hash: "x".to_string(),
1405 }],
1406 tools_fetched_at_ms: None,
1407 pending_auth_by_tool: HashMap::new(),
1408 };
1409
1410 let normalized = normalize_mcp_tool_args(
1411 &server,
1412 "Clickup_CreateTask",
1413 json!({
1414 "listId": "123",
1415 "name": "Prep fish"
1416 }),
1417 );
1418 assert_eq!(
1419 normalized.get("list_id").and_then(|v| v.as_str()),
1420 Some("123")
1421 );
1422 assert_eq!(
1423 normalized.get("task_title").and_then(|v| v.as_str()),
1424 Some("Prep fish")
1425 );
1426 }
1427
1428 #[test]
1429 fn normalize_arg_key_ignores_case_and_separators() {
1430 assert_eq!(normalize_arg_key("task_title"), "tasktitle");
1431 assert_eq!(normalize_arg_key("taskTitle"), "tasktitle");
1432 assert_eq!(normalize_arg_key("task-title"), "tasktitle");
1433 }
1434
1435 #[test]
1436 fn pending_auth_blocks_retries_within_cooldown() {
1437 let mut server = McpServer {
1438 name: "arcade".to_string(),
1439 transport: "https://example.com/mcp".to_string(),
1440 enabled: true,
1441 connected: true,
1442 pid: None,
1443 last_error: None,
1444 last_auth_challenge: None,
1445 mcp_session_id: None,
1446 headers: HashMap::new(),
1447 tool_cache: Vec::new(),
1448 tools_fetched_at_ms: None,
1449 pending_auth_by_tool: HashMap::new(),
1450 };
1451 server.pending_auth_by_tool.insert(
1452 "clickup_whoami".to_string(),
1453 PendingMcpAuth {
1454 challenge_id: "abc".to_string(),
1455 authorization_url: "https://example.com/auth".to_string(),
1456 message: "Authorize ClickUp access.".to_string(),
1457 status: "pending".to_string(),
1458 first_seen_ms: 1_000,
1459 last_probe_ms: 2_000,
1460 },
1461 );
1462 let blocked =
1463 pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 10_000, 15_000)
1464 .expect("should block");
1465 assert!(blocked.output.contains("Authorization pending"));
1466 assert!(blocked
1467 .mcp_auth
1468 .get("pending")
1469 .and_then(|v| v.as_bool())
1470 .unwrap_or(false));
1471 }
1472
1473 #[test]
1474 fn pending_auth_allows_probe_after_cooldown() {
1475 let mut server = McpServer {
1476 name: "arcade".to_string(),
1477 transport: "https://example.com/mcp".to_string(),
1478 enabled: true,
1479 connected: true,
1480 pid: None,
1481 last_error: None,
1482 last_auth_challenge: None,
1483 mcp_session_id: None,
1484 headers: HashMap::new(),
1485 tool_cache: Vec::new(),
1486 tools_fetched_at_ms: None,
1487 pending_auth_by_tool: HashMap::new(),
1488 };
1489 server.pending_auth_by_tool.insert(
1490 "clickup_whoami".to_string(),
1491 PendingMcpAuth {
1492 challenge_id: "abc".to_string(),
1493 authorization_url: "https://example.com/auth".to_string(),
1494 message: "Authorize ClickUp access.".to_string(),
1495 status: "pending".to_string(),
1496 first_seen_ms: 1_000,
1497 last_probe_ms: 2_000,
1498 },
1499 );
1500 assert!(
1501 pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 17_001, 15_000)
1502 .is_none(),
1503 "cooldown elapsed should allow re-probe"
1504 );
1505 }
1506
1507 #[test]
1508 fn pending_auth_is_tool_scoped() {
1509 let mut server = McpServer {
1510 name: "arcade".to_string(),
1511 transport: "https://example.com/mcp".to_string(),
1512 enabled: true,
1513 connected: true,
1514 pid: None,
1515 last_error: None,
1516 last_auth_challenge: None,
1517 mcp_session_id: None,
1518 headers: HashMap::new(),
1519 tool_cache: Vec::new(),
1520 tools_fetched_at_ms: None,
1521 pending_auth_by_tool: HashMap::new(),
1522 };
1523 server.pending_auth_by_tool.insert(
1524 "gmail_sendemail".to_string(),
1525 PendingMcpAuth {
1526 challenge_id: "abc".to_string(),
1527 authorization_url: "https://example.com/auth".to_string(),
1528 message: "Authorize Gmail access.".to_string(),
1529 status: "pending".to_string(),
1530 first_seen_ms: 1_000,
1531 last_probe_ms: 2_000,
1532 },
1533 );
1534 assert!(pending_auth_short_circuit(
1535 &server,
1536 "gmail_sendemail",
1537 "Gmail_SendEmail",
1538 2_100,
1539 15_000
1540 )
1541 .is_some());
1542 assert!(pending_auth_short_circuit(
1543 &server,
1544 "clickup_whoami",
1545 "Clickup_WhoAmI",
1546 2_100,
1547 15_000
1548 )
1549 .is_none());
1550 }
1551}