1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tokio::sync::Mutex;
6
7use crate::KontextAuthSession;
8use crate::KontextDevClient;
9use crate::KontextDevConfig;
10use crate::KontextDevError;
11
12pub const DEFAULT_SERVER: &str = "https://api.kontext.dev";
13const MCP_SESSION_HEADER: &str = "Mcp-Session-Id";
14const META_SEARCH_TOOLS: &str = "SEARCH_TOOLS";
15const META_EXECUTE_TOOL: &str = "EXECUTE_TOOL";
16const DEFAULT_MCP_PROTOCOL_VERSION: &str = "2025-06-18";
17const STREAMABLE_HTTP_ACCEPT: &str = "application/json, text/event-stream";
18const STREAM_CONTENT_TYPE: &str = "text/event-stream";
19
20pub fn normalize_kontext_server_url(server: &str) -> String {
21 let mut url = server.trim_end_matches('/').to_string();
22 if let Some(stripped) = url.strip_suffix("/api/v1") {
23 url = stripped.to_string();
24 }
25 if let Some(stripped) = url.strip_suffix("/mcp") {
26 url = stripped.to_string();
27 }
28 url.trim_end_matches('/').to_string()
29}
30
31#[derive(Clone, Debug)]
32pub struct KontextMcpConfig {
33 pub client_session_id: String,
34 pub client_id: String,
35 pub redirect_uri: String,
36 pub url: Option<String>,
37 pub server: Option<String>,
38 pub client_secret: Option<String>,
39 pub scope: Option<String>,
40 pub resource: Option<String>,
41 pub session_key: Option<String>,
42 pub integration_ui_url: Option<String>,
43 pub integration_return_to: Option<String>,
44 pub auth_timeout_seconds: Option<i64>,
45 pub open_connect_page_on_login: Option<bool>,
46 pub token_cache_path: Option<String>,
47}
48
49impl Default for KontextMcpConfig {
50 fn default() -> Self {
51 Self {
52 client_session_id: String::new(),
53 client_id: String::new(),
54 redirect_uri: "http://localhost:3333/callback".to_string(),
55 url: None,
56 server: Some(DEFAULT_SERVER.to_string()),
57 client_secret: None,
58 scope: None,
59 resource: None,
60 session_key: None,
61 integration_ui_url: None,
62 integration_return_to: None,
63 auth_timeout_seconds: None,
64 open_connect_page_on_login: None,
65 token_cache_path: None,
66 }
67 }
68}
69
70#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
71#[serde(rename_all = "snake_case")]
72pub enum RuntimeIntegrationCategory {
73 GatewayRemoteMcp,
74 InternalMcpCredentials,
75}
76
77#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
78#[serde(rename_all = "snake_case")]
79pub enum RuntimeIntegrationConnectType {
80 Oauth,
81 UserToken,
82 Credentials,
83 None,
84}
85
86#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
87#[serde(rename_all = "camelCase")]
88pub struct RuntimeIntegrationRecord {
89 pub id: String,
90 pub name: String,
91 pub url: String,
92 pub category: RuntimeIntegrationCategory,
93 pub connect_type: RuntimeIntegrationConnectType,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 pub auth_mode: Option<String>,
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub credential_schema: Option<serde_json::Value>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub requires_oauth: Option<bool>,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub connection: Option<RuntimeIntegrationConnection>,
102}
103
104#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
105#[serde(rename_all = "camelCase")]
106pub struct RuntimeIntegrationConnection {
107 pub connected: bool,
108 pub status: String,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub expires_at: Option<String>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub display_name: Option<String>,
113}
114
115#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
116#[serde(rename_all = "camelCase")]
117pub struct KontextTool {
118 pub id: String,
119 pub name: String,
120 #[serde(skip_serializing_if = "Option::is_none")]
121 pub description: Option<String>,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub input_schema: Option<serde_json::Value>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub server: Option<KontextToolServer>,
126}
127
128#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
129#[serde(rename_all = "camelCase")]
130pub struct KontextToolServer {
131 pub id: String,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 pub name: Option<String>,
134}
135
136#[derive(Clone, Debug, Default)]
137struct McpSessionState {
138 session_id: Option<String>,
139 access_token: Option<String>,
140}
141
142#[derive(Clone, Debug)]
143pub struct KontextMcp {
144 config: KontextMcpConfig,
145 client: KontextDevClient,
146 http: reqwest::Client,
147 session: Arc<Mutex<McpSessionState>>,
148}
149
150impl KontextMcp {
151 pub fn new(config: KontextMcpConfig) -> Self {
152 let server =
153 normalize_kontext_server_url(config.server.as_deref().unwrap_or(DEFAULT_SERVER));
154 let sdk_config = KontextDevConfig {
155 server,
156 client_id: config.client_id.clone(),
157 client_secret: config.client_secret.clone(),
158 scope: config.scope.clone().unwrap_or_default(),
159 server_name: "kontext-dev".to_string(),
160 resource: config
161 .resource
162 .clone()
163 .unwrap_or_else(|| "mcp-gateway".to_string()),
164 integration_ui_url: config.integration_ui_url.clone(),
165 integration_return_to: config.integration_return_to.clone(),
166 open_connect_page_on_login: config.open_connect_page_on_login.unwrap_or(true),
167 auth_timeout_seconds: config.auth_timeout_seconds.unwrap_or(300),
168 token_cache_path: config.token_cache_path.clone(),
169 redirect_uri: config.redirect_uri.clone(),
170 };
171
172 Self {
173 config,
174 client: KontextDevClient::new(sdk_config),
175 http: reqwest::Client::new(),
176 session: Arc::new(Mutex::new(McpSessionState::default())),
177 }
178 }
179
180 pub fn client(&self) -> &KontextDevClient {
181 &self.client
182 }
183
184 pub async fn authenticate_mcp(&self) -> Result<KontextAuthSession, KontextDevError> {
185 self.client.authenticate_mcp().await
186 }
187
188 pub fn mcp_url(&self) -> Result<String, KontextDevError> {
189 if let Some(url) = &self.config.url {
190 return Ok(url.clone());
191 }
192 self.client.mcp_url()
193 }
194
195 pub async fn list_integrations(
196 &self,
197 ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
198 let session = self.authenticate_mcp().await?;
199 let base = self.client.server_base_url()?;
200 let response = self
201 .http
202 .get(format!("{}/mcp/integrations", base.trim_end_matches('/')))
203 .bearer_auth(session.gateway_token.access_token)
204 .send()
205 .await
206 .map_err(|err| KontextDevError::ConnectSession {
207 message: err.to_string(),
208 })?;
209
210 if !response.status().is_success() {
211 let status = response.status();
212 let body = response.text().await.unwrap_or_default();
213 return Err(KontextDevError::ConnectSession {
214 message: format!("{status}: {body}"),
215 });
216 }
217
218 #[derive(Deserialize)]
219 struct IntegrationsResponse {
220 items: Vec<RuntimeIntegrationRecord>,
221 }
222
223 let payload = response
224 .json::<IntegrationsResponse>()
225 .await
226 .map_err(|err| KontextDevError::ConnectSession {
227 message: err.to_string(),
228 })?;
229
230 Ok(payload.items)
231 }
232
233 pub async fn list_tools(&self) -> Result<Vec<KontextTool>, KontextDevError> {
234 let session = self.authenticate_mcp().await?;
235 self.list_tools_with_access_token(&session.gateway_token.access_token)
236 .await
237 }
238
239 pub async fn list_tools_with_access_token(
240 &self,
241 access_token: &str,
242 ) -> Result<Vec<KontextTool>, KontextDevError> {
243 let result = self
244 .json_rpc_with_session(
245 access_token,
246 "tools/list",
247 json!({}),
248 Some("list-tools"),
249 true,
250 )
251 .await?;
252 parse_tools_list_result(&result)
253 }
254
255 pub async fn call_tool(
256 &self,
257 tool_id: &str,
258 args: Option<serde_json::Map<String, serde_json::Value>>,
259 ) -> Result<serde_json::Value, KontextDevError> {
260 let session = self.authenticate_mcp().await?;
261 self.call_tool_with_access_token(&session.gateway_token.access_token, tool_id, args)
262 .await
263 }
264
265 pub async fn call_tool_with_access_token(
266 &self,
267 access_token: &str,
268 tool_id: &str,
269 args: Option<serde_json::Map<String, serde_json::Value>>,
270 ) -> Result<serde_json::Value, KontextDevError> {
271 self.json_rpc_with_session(
272 access_token,
273 "tools/call",
274 json!({ "name": tool_id, "arguments": args.unwrap_or_default() }),
275 Some("call-tool"),
276 true,
277 )
278 .await
279 }
280
281 async fn json_rpc_with_session(
282 &self,
283 access_token: &str,
284 method: &str,
285 params: Value,
286 id: Option<&str>,
287 allow_session_retry: bool,
288 ) -> Result<Value, KontextDevError> {
289 let max_attempts = if allow_session_retry { 2 } else { 1 };
290 for attempt in 0..max_attempts {
291 let session_id = self.ensure_mcp_session(access_token).await?;
292
293 let response = self
294 .http
295 .post(self.mcp_url()?)
296 .bearer_auth(access_token)
297 .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
298 .header(MCP_SESSION_HEADER, &session_id)
299 .json(&json!({
300 "jsonrpc": "2.0",
301 "id": id.unwrap_or("1"),
302 "method": method,
303 "params": params,
304 }))
305 .send()
306 .await
307 .map_err(|err| KontextDevError::ConnectSession {
308 message: err.to_string(),
309 })?;
310
311 if !response.status().is_success() {
312 let status = response.status();
313 let body = response.text().await.unwrap_or_default();
314 let retryable = attempt + 1 < max_attempts && is_invalid_session_error(&body);
315 if retryable {
316 self.invalidate_session().await;
317 continue;
318 }
319 return Err(KontextDevError::ConnectSession {
320 message: format!("{status}: {body}"),
321 });
322 }
323
324 let payload = parse_json_or_streamable_response(response).await?;
325
326 if let Some(error) = payload.get("error") {
327 let message = extract_jsonrpc_error_message(error);
328 let retryable =
329 attempt + 1 < max_attempts && is_invalid_session_error(message.as_str());
330 if retryable {
331 self.invalidate_session().await;
332 continue;
333 }
334 return Err(KontextDevError::ConnectSession { message });
335 }
336
337 return Ok(payload.get("result").cloned().unwrap_or(Value::Null));
338 }
339
340 Err(KontextDevError::ConnectSession {
341 message: "MCP request failed after session retry".to_string(),
342 })
343 }
344
345 async fn ensure_mcp_session(&self, access_token: &str) -> Result<String, KontextDevError> {
346 {
347 let guard = self.session.lock().await;
348 if guard.access_token.as_deref() == Some(access_token)
349 && let Some(session_id) = guard.session_id.clone()
350 {
351 return Ok(session_id);
352 }
353 }
354
355 let initialize_response = self
356 .http
357 .post(self.mcp_url()?)
358 .bearer_auth(access_token)
359 .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
360 .json(&json!({
361 "jsonrpc": "2.0",
362 "id": "initialize",
363 "method": "initialize",
364 "params": {
365 "protocolVersion": DEFAULT_MCP_PROTOCOL_VERSION,
366 "capabilities": {
367 "tools": {}
368 },
369 "clientInfo": {
370 "name": "kontext-dev-sdk-rs",
371 "version": env!("CARGO_PKG_VERSION"),
372 "sessionId": self.config.client_session_id
373 }
374 }
375 }))
376 .send()
377 .await
378 .map_err(|err| KontextDevError::ConnectSession {
379 message: err.to_string(),
380 })?;
381
382 if !initialize_response.status().is_success() {
383 let status = initialize_response.status();
384 let body = initialize_response.text().await.unwrap_or_default();
385 return Err(KontextDevError::ConnectSession {
386 message: format!("{status}: {body}"),
387 });
388 }
389
390 let session_header = initialize_response
391 .headers()
392 .get(MCP_SESSION_HEADER)
393 .or_else(|| initialize_response.headers().get("mcp-session-id"))
394 .and_then(|value| value.to_str().ok())
395 .map(|value| value.trim().to_string());
396
397 let initialize_payload = parse_json_or_streamable_response(initialize_response).await?;
398
399 if let Some(error) = initialize_payload.get("error") {
400 return Err(KontextDevError::ConnectSession {
401 message: extract_jsonrpc_error_message(error),
402 });
403 }
404
405 let session_id = session_header
406 .or_else(|| {
407 initialize_payload
408 .get("result")
409 .and_then(|result| result.get("sessionId"))
410 .and_then(|value| value.as_str())
411 .map(|value| value.to_string())
412 })
413 .or_else(|| {
414 initialize_payload
415 .get("result")
416 .and_then(|result| result.get("session_id"))
417 .and_then(|value| value.as_str())
418 .map(|value| value.to_string())
419 })
420 .ok_or_else(|| KontextDevError::ConnectSession {
421 message: "MCP initialize did not return a session id".to_string(),
422 })?;
423
424 let _ = self
427 .http
428 .post(self.mcp_url()?)
429 .bearer_auth(access_token)
430 .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
431 .header(MCP_SESSION_HEADER, &session_id)
432 .json(&json!({
433 "jsonrpc": "2.0",
434 "method": "notifications/initialized",
435 "params": {}
436 }))
437 .send()
438 .await;
439
440 {
441 let mut guard = self.session.lock().await;
442 guard.session_id = Some(session_id.clone());
443 guard.access_token = Some(access_token.to_string());
444 }
445
446 Ok(session_id)
447 }
448
449 async fn invalidate_session(&self) {
450 let mut guard = self.session.lock().await;
451 guard.session_id = None;
452 guard.access_token = None;
453 }
454}
455
456fn extract_jsonrpc_error_message(error: &Value) -> String {
457 error
458 .get("message")
459 .and_then(|value| value.as_str())
460 .map(ToString::to_string)
461 .or_else(|| {
462 error
463 .get("error_description")
464 .and_then(|value| value.as_str())
465 .map(ToString::to_string)
466 })
467 .unwrap_or_else(|| error.to_string())
468}
469
470fn is_invalid_session_error(message: &str) -> bool {
471 let lower = message.to_ascii_lowercase();
472 lower.contains("no valid session id")
473 || lower.contains("no valid session-id")
474 || lower.contains("invalid session")
475}
476
477async fn parse_json_or_streamable_response(
478 response: reqwest::Response,
479) -> Result<Value, KontextDevError> {
480 let content_type = response
481 .headers()
482 .get(reqwest::header::CONTENT_TYPE)
483 .and_then(|value| value.to_str().ok())
484 .map(|value| value.to_ascii_lowercase())
485 .unwrap_or_default();
486 let body = response
487 .text()
488 .await
489 .map_err(|err| KontextDevError::ConnectSession {
490 message: err.to_string(),
491 })?;
492
493 parse_json_or_streamable_body(&body, &content_type)
494 .map_err(|message| KontextDevError::ConnectSession { message })
495}
496
497fn parse_json_or_streamable_body(body: &str, content_type: &str) -> Result<Value, String> {
498 let parse_json = || serde_json::from_str::<Value>(body).map_err(|err| err.to_string());
499 let parse_sse = || parse_sse_last_json_event(body);
500
501 if content_type.contains(STREAM_CONTENT_TYPE) {
502 return parse_sse().ok_or_else(|| {
503 "failed to parse streamable MCP response as SSE JSON events".to_string()
504 });
505 }
506
507 parse_json().or_else(|json_err| {
508 parse_sse().ok_or_else(|| format!("failed to decode response body: {json_err}"))
509 })
510}
511
512fn parse_sse_last_json_event(body: &str) -> Option<Value> {
513 let mut current_data = Vec::<String>::new();
514 let mut last_json = None;
515
516 let flush_data = |current_data: &mut Vec<String>, last_json: &mut Option<Value>| {
517 if current_data.is_empty() {
518 return;
519 }
520 let data = current_data.join("\n");
521 current_data.clear();
522 let trimmed = data.trim();
523 if trimmed.is_empty() || trimmed == "[DONE]" {
524 return;
525 }
526 if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
527 *last_json = Some(value);
528 }
529 };
530
531 for line in body.lines() {
532 let line = line.trim_end_matches('\r');
533 if line.is_empty() {
534 flush_data(&mut current_data, &mut last_json);
535 continue;
536 }
537 if let Some(data) = line.strip_prefix("data:") {
538 current_data.push(data.trim_start().to_string());
539 continue;
540 }
541 if let Ok(value) = serde_json::from_str::<Value>(line) {
542 last_json = Some(value);
543 }
544 }
545 flush_data(&mut current_data, &mut last_json);
546
547 last_json
548}
549
550pub(crate) fn has_meta_gateway_tools(tools: &[KontextTool]) -> bool {
551 let mut has_search = false;
552 let mut has_execute = false;
553 for tool in tools {
554 if tool.name == META_SEARCH_TOOLS {
555 has_search = true;
556 } else if tool.name == META_EXECUTE_TOOL {
557 has_execute = true;
558 }
559 }
560 has_search && has_execute
561}
562
563pub(crate) fn extract_json_resource_text(result: &Value) -> Option<String> {
564 let content = result.get("content")?.as_array()?;
565 for item in content {
566 if item.get("type").and_then(Value::as_str) != Some("resource") {
567 continue;
568 }
569 let Some(resource) = item.get("resource") else {
570 continue;
571 };
572 if resource.get("mimeType").and_then(Value::as_str) != Some("application/json") {
573 continue;
574 }
575 if let Some(text) = resource.get("text").and_then(Value::as_str) {
576 return Some(text.to_string());
577 }
578 }
579 None
580}
581
582pub(crate) fn extract_text_content(result: &Value) -> String {
583 let Some(content) = result.get("content").and_then(Value::as_array) else {
584 return result.to_string();
585 };
586
587 let mut text_items = Vec::new();
588 for item in content {
589 if item.get("type").and_then(Value::as_str) == Some("text")
590 && let Some(text) = item.get("text").and_then(Value::as_str)
591 {
592 text_items.push(text.to_string());
593 }
594 }
595 if !text_items.is_empty() {
596 return text_items.join("\n");
597 }
598
599 let mut resource_items = Vec::new();
600 for item in content {
601 if item.get("type").and_then(Value::as_str) != Some("resource") {
602 continue;
603 }
604 let Some(resource_text) = item
605 .get("resource")
606 .and_then(|resource| resource.get("text"))
607 .and_then(Value::as_str)
608 else {
609 continue;
610 };
611
612 let parsed = serde_json::from_str::<Value>(resource_text)
613 .ok()
614 .map(|value| extract_text_content(&value))
615 .unwrap_or_else(|| resource_text.to_string());
616 resource_items.push(parsed);
617 }
618
619 if !resource_items.is_empty() {
620 return resource_items.join("\n");
621 }
622
623 content
624 .iter()
625 .map(Value::to_string)
626 .collect::<Vec<_>>()
627 .join("\n")
628}
629
630#[derive(Clone, Debug)]
631pub(crate) struct GatewayToolsPayload {
632 pub tools: Vec<KontextTool>,
633 pub errors: Vec<GatewayToolError>,
634 pub elicitations: Vec<GatewayElicitation>,
635}
636
637#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
638#[serde(rename_all = "camelCase")]
639pub struct GatewayToolError {
640 pub server_id: String,
641 #[serde(default)]
642 pub server_name: Option<String>,
643 #[serde(default)]
644 pub reason: Option<String>,
645}
646
647#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
648#[serde(rename_all = "camelCase")]
649pub struct GatewayElicitation {
650 pub url: String,
651 #[serde(default)]
652 pub message: Option<String>,
653 #[serde(default)]
654 pub integration_id: Option<String>,
655 #[serde(default)]
656 pub integration_name: Option<String>,
657}
658
659#[derive(Clone, Debug, Deserialize)]
660#[serde(rename_all = "camelCase")]
661struct GatewayToolSummary {
662 id: String,
663 name: String,
664 #[serde(default)]
665 description: Option<String>,
666 #[serde(default)]
667 input_schema: Option<Value>,
668 #[serde(default)]
669 server: Option<GatewayToolServer>,
670}
671
672#[derive(Clone, Debug, Deserialize)]
673#[serde(rename_all = "camelCase")]
674struct GatewayToolServer {
675 #[serde(default)]
676 id: Option<String>,
677 #[serde(default)]
678 name: Option<String>,
679}
680
681#[derive(Debug, Deserialize)]
682#[serde(rename_all = "camelCase")]
683struct RawTool {
684 name: String,
685 #[serde(default)]
686 description: Option<String>,
687 #[serde(default)]
688 input_schema: Option<serde_json::Value>,
689}
690
691fn parse_tools_list_result(result: &Value) -> Result<Vec<KontextTool>, KontextDevError> {
692 let tools = result
693 .get("tools")
694 .and_then(|value| value.as_array())
695 .cloned()
696 .unwrap_or_default();
697
698 tools
699 .into_iter()
700 .map(|tool| {
701 let raw: RawTool =
702 serde_json::from_value(tool).map_err(|err| KontextDevError::ConnectSession {
703 message: format!("invalid tool payload: {err}"),
704 })?;
705
706 Ok(KontextTool {
707 id: raw.name.clone(),
708 name: raw.name,
709 description: raw.description,
710 input_schema: raw.input_schema,
711 server: None,
712 })
713 })
714 .collect()
715}
716
717pub(crate) fn parse_gateway_tools_payload(
718 raw: &Value,
719) -> Result<GatewayToolsPayload, KontextDevError> {
720 let json_text =
721 extract_json_resource_text(raw).ok_or_else(|| KontextDevError::ConnectSession {
722 message: "SEARCH_TOOLS did not return JSON resource content".to_string(),
723 })?;
724
725 let parsed = serde_json::from_str::<Value>(&json_text).map_err(|err| {
726 KontextDevError::ConnectSession {
727 message: format!("SEARCH_TOOLS returned invalid JSON: {err}"),
728 }
729 })?;
730
731 if let Some(items) = parsed.as_array() {
732 let tools = items
733 .iter()
734 .cloned()
735 .map(serde_json::from_value::<GatewayToolSummary>)
736 .collect::<Result<Vec<_>, _>>()
737 .map_err(|err| KontextDevError::ConnectSession {
738 message: format!("SEARCH_TOOLS returned invalid tool entry: {err}"),
739 })?
740 .into_iter()
741 .map(to_kontext_gateway_tool)
742 .collect();
743 return Ok(GatewayToolsPayload {
744 tools,
745 errors: Vec::new(),
746 elicitations: Vec::new(),
747 });
748 }
749
750 let Some(obj) = parsed.as_object() else {
751 return Err(KontextDevError::ConnectSession {
752 message: "SEARCH_TOOLS response was not a JSON array or object".to_string(),
753 });
754 };
755
756 let tools = obj
757 .get("items")
758 .and_then(Value::as_array)
759 .cloned()
760 .unwrap_or_default()
761 .into_iter()
762 .map(serde_json::from_value::<GatewayToolSummary>)
763 .collect::<Result<Vec<_>, _>>()
764 .map_err(|err| KontextDevError::ConnectSession {
765 message: format!("SEARCH_TOOLS items contained invalid tool data: {err}"),
766 })?
767 .into_iter()
768 .map(to_kontext_gateway_tool)
769 .collect::<Vec<_>>();
770
771 let errors = obj
772 .get("errors")
773 .and_then(Value::as_array)
774 .cloned()
775 .unwrap_or_default()
776 .into_iter()
777 .filter_map(|value| serde_json::from_value::<GatewayToolError>(value).ok())
778 .collect::<Vec<_>>();
779
780 let elicitations = obj
781 .get("elicitations")
782 .and_then(Value::as_array)
783 .cloned()
784 .unwrap_or_default()
785 .into_iter()
786 .filter_map(|value| serde_json::from_value::<GatewayElicitation>(value).ok())
787 .collect::<Vec<_>>();
788
789 Ok(GatewayToolsPayload {
790 tools,
791 errors,
792 elicitations,
793 })
794}
795
796fn to_kontext_gateway_tool(summary: GatewayToolSummary) -> KontextTool {
797 let server = summary.server.and_then(|server| {
798 server.id.map(|id| KontextToolServer {
799 id,
800 name: server.name,
801 })
802 });
803
804 KontextTool {
805 id: summary.id,
806 name: summary.name,
807 description: summary.description,
808 input_schema: summary.input_schema,
809 server,
810 }
811}
812
813#[cfg(test)]
814mod tests {
815 use super::*;
816
817 #[test]
818 fn parse_json_or_streamable_body_parses_json_payload() {
819 let parsed = parse_json_or_streamable_body(
820 r#"{"jsonrpc":"2.0","result":{"ok":true}}"#,
821 "application/json",
822 )
823 .expect("json should parse");
824 assert_eq!(parsed["result"]["ok"], Value::Bool(true));
825 }
826
827 #[test]
828 fn parse_json_or_streamable_body_parses_sse_payload() {
829 let parsed = parse_json_or_streamable_body(
830 "event: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{\"sessionId\":\"abc\"}}\n\n",
831 "text/event-stream",
832 )
833 .expect("sse should parse");
834 assert_eq!(
835 parsed["result"]["sessionId"],
836 Value::String("abc".to_string())
837 );
838 }
839
840 #[test]
841 fn parse_json_or_streamable_body_falls_back_to_sse_when_content_type_is_json() {
842 let parsed = parse_json_or_streamable_body(
843 "data: {\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[]}}\n\n",
844 "application/json",
845 )
846 .expect("sse fallback should parse");
847 assert_eq!(parsed["result"]["tools"], Value::Array(Vec::new()));
848 }
849
850 #[test]
851 fn raw_tool_parses_input_schema_from_camel_case_key() {
852 let parsed: RawTool = serde_json::from_value(serde_json::json!({
853 "name": "SEARCH_TOOLS",
854 "description": "Search available tools",
855 "inputSchema": { "type": "object", "properties": { "limit": { "type": "number" } } }
856 }))
857 .expect("raw tool should deserialize");
858
859 assert_eq!(parsed.name, "SEARCH_TOOLS");
860 assert_eq!(
861 parsed
862 .input_schema
863 .as_ref()
864 .and_then(|value| value.get("type"))
865 .and_then(Value::as_str),
866 Some("object")
867 );
868 }
869
870 #[test]
871 fn extract_json_resource_text_skips_resource_items_without_resource_payload() {
872 let payload = serde_json::json!({
873 "content": [
874 { "type": "resource" },
875 {
876 "type": "resource",
877 "resource": {
878 "mimeType": "application/json",
879 "text": "{\"ok\":true}"
880 }
881 }
882 ]
883 });
884
885 assert_eq!(
886 extract_json_resource_text(&payload),
887 Some("{\"ok\":true}".to_string())
888 );
889 }
890
891 #[test]
892 fn runtime_integration_record_parses_user_token_connect_type() {
893 let parsed: RuntimeIntegrationRecord = serde_json::from_value(serde_json::json!({
894 "id": "convex-int",
895 "name": "Convex",
896 "url": "https://convex.example.com/mcp",
897 "category": "gateway_remote_mcp",
898 "connectType": "user_token"
899 }))
900 .expect("record should deserialize");
901
902 assert_eq!(
903 parsed.connect_type,
904 RuntimeIntegrationConnectType::UserToken
905 );
906 }
907
908 #[test]
909 fn runtime_integration_record_rejects_unknown_connect_type() {
910 let err = serde_json::from_value::<RuntimeIntegrationRecord>(serde_json::json!({
911 "id": "convex-int",
912 "name": "Convex",
913 "url": "https://convex.example.com/mcp",
914 "category": "gateway_remote_mcp",
915 "connectType": "api_key"
916 }))
917 .expect_err("record should reject unknown connect type");
918
919 assert!(err.to_string().contains("unknown variant"));
920 }
921}