1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tokio::sync::Mutex;
6
7use crate::KontextAuthSession;
8use crate::KontextDevClient;
9use crate::KontextDevConfig;
10use crate::KontextDevError;
11
12pub const DEFAULT_SERVER: &str = "https://api.kontext.dev";
13const MCP_SESSION_HEADER: &str = "Mcp-Session-Id";
14const META_SEARCH_TOOLS: &str = "SEARCH_TOOLS";
15const META_EXECUTE_TOOL: &str = "EXECUTE_TOOL";
16const DEFAULT_MCP_PROTOCOL_VERSION: &str = "2025-06-18";
17const STREAMABLE_HTTP_ACCEPT: &str = "application/json, text/event-stream";
18const STREAM_CONTENT_TYPE: &str = "text/event-stream";
19
20pub fn normalize_kontext_server_url(server: &str) -> String {
21 let mut url = server.trim_end_matches('/').to_string();
22 if let Some(stripped) = url.strip_suffix("/api/v1") {
23 url = stripped.to_string();
24 }
25 if let Some(stripped) = url.strip_suffix("/mcp") {
26 url = stripped.to_string();
27 }
28 url.trim_end_matches('/').to_string()
29}
30
31#[derive(Clone, Debug)]
32pub struct KontextMcpConfig {
33 pub client_id: String,
34 pub redirect_uri: String,
35 pub url: Option<String>,
36 pub server: Option<String>,
37 pub client_secret: Option<String>,
38 pub scope: Option<String>,
39 pub resource: Option<String>,
40 pub session_key: Option<String>,
41 pub integration_ui_url: Option<String>,
42 pub integration_return_to: Option<String>,
43 pub auth_timeout_seconds: Option<i64>,
44 pub open_connect_page_on_login: Option<bool>,
45 pub token_cache_path: Option<String>,
46}
47
48impl Default for KontextMcpConfig {
49 fn default() -> Self {
50 Self {
51 client_id: String::new(),
52 redirect_uri: "http://localhost:3333/callback".to_string(),
53 url: None,
54 server: Some(DEFAULT_SERVER.to_string()),
55 client_secret: None,
56 scope: None,
57 resource: None,
58 session_key: None,
59 integration_ui_url: None,
60 integration_return_to: None,
61 auth_timeout_seconds: None,
62 open_connect_page_on_login: None,
63 token_cache_path: None,
64 }
65 }
66}
67
68#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
69#[serde(rename_all = "snake_case")]
70pub enum RuntimeIntegrationCategory {
71 GatewayRemoteMcp,
72 InternalMcpCredentials,
73}
74
75#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
76#[serde(rename_all = "snake_case")]
77pub enum RuntimeIntegrationConnectType {
78 Oauth,
79 Credentials,
80 None,
81}
82
83#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
84#[serde(rename_all = "camelCase")]
85pub struct RuntimeIntegrationRecord {
86 pub id: String,
87 pub name: String,
88 pub url: String,
89 pub category: RuntimeIntegrationCategory,
90 pub connect_type: RuntimeIntegrationConnectType,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub auth_mode: Option<String>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub credential_schema: Option<serde_json::Value>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub requires_oauth: Option<bool>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub connection: Option<RuntimeIntegrationConnection>,
99}
100
101#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
102#[serde(rename_all = "camelCase")]
103pub struct RuntimeIntegrationConnection {
104 pub connected: bool,
105 pub status: String,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub expires_at: Option<String>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub display_name: Option<String>,
110}
111
112#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
113#[serde(rename_all = "camelCase")]
114pub struct KontextTool {
115 pub id: String,
116 pub name: String,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 pub description: Option<String>,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub input_schema: Option<serde_json::Value>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub server: Option<KontextToolServer>,
123}
124
125#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
126#[serde(rename_all = "camelCase")]
127pub struct KontextToolServer {
128 pub id: String,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub name: Option<String>,
131}
132
133#[derive(Clone, Debug, Default)]
134struct McpSessionState {
135 session_id: Option<String>,
136 access_token: Option<String>,
137}
138
139#[derive(Clone, Debug)]
140pub struct KontextMcp {
141 config: KontextMcpConfig,
142 client: KontextDevClient,
143 http: reqwest::Client,
144 session: Arc<Mutex<McpSessionState>>,
145}
146
147impl KontextMcp {
148 pub fn new(config: KontextMcpConfig) -> Self {
149 let server =
150 normalize_kontext_server_url(config.server.as_deref().unwrap_or(DEFAULT_SERVER));
151 let sdk_config = KontextDevConfig {
152 server,
153 client_id: config.client_id.clone(),
154 client_secret: config.client_secret.clone(),
155 scope: config.scope.clone().unwrap_or_default(),
156 server_name: "kontext-dev".to_string(),
157 resource: config
158 .resource
159 .clone()
160 .unwrap_or_else(|| "mcp-gateway".to_string()),
161 integration_ui_url: config.integration_ui_url.clone(),
162 integration_return_to: config.integration_return_to.clone(),
163 open_connect_page_on_login: config.open_connect_page_on_login.unwrap_or(true),
164 auth_timeout_seconds: config.auth_timeout_seconds.unwrap_or(300),
165 token_cache_path: config.token_cache_path.clone(),
166 redirect_uri: config.redirect_uri.clone(),
167 };
168
169 Self {
170 config,
171 client: KontextDevClient::new(sdk_config),
172 http: reqwest::Client::new(),
173 session: Arc::new(Mutex::new(McpSessionState::default())),
174 }
175 }
176
177 pub fn client(&self) -> &KontextDevClient {
178 &self.client
179 }
180
181 pub async fn authenticate_mcp(&self) -> Result<KontextAuthSession, KontextDevError> {
182 self.client.authenticate_mcp().await
183 }
184
185 pub fn mcp_url(&self) -> Result<String, KontextDevError> {
186 if let Some(url) = &self.config.url {
187 return Ok(url.clone());
188 }
189 self.client.mcp_url()
190 }
191
192 pub async fn list_integrations(
193 &self,
194 ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
195 let session = self.authenticate_mcp().await?;
196 let base = self.client.server_base_url()?;
197 let response = self
198 .http
199 .get(format!("{}/mcp/integrations", base.trim_end_matches('/')))
200 .bearer_auth(session.gateway_token.access_token)
201 .send()
202 .await
203 .map_err(|err| KontextDevError::ConnectSession {
204 message: err.to_string(),
205 })?;
206
207 if !response.status().is_success() {
208 let status = response.status();
209 let body = response.text().await.unwrap_or_default();
210 return Err(KontextDevError::ConnectSession {
211 message: format!("{status}: {body}"),
212 });
213 }
214
215 #[derive(Deserialize)]
216 struct IntegrationsResponse {
217 items: Vec<RuntimeIntegrationRecord>,
218 }
219
220 let payload = response
221 .json::<IntegrationsResponse>()
222 .await
223 .map_err(|err| KontextDevError::ConnectSession {
224 message: err.to_string(),
225 })?;
226
227 Ok(payload.items)
228 }
229
230 pub async fn list_tools(&self) -> Result<Vec<KontextTool>, KontextDevError> {
231 let session = self.authenticate_mcp().await?;
232 self.list_tools_with_access_token(&session.gateway_token.access_token)
233 .await
234 }
235
236 pub async fn list_tools_with_access_token(
237 &self,
238 access_token: &str,
239 ) -> Result<Vec<KontextTool>, KontextDevError> {
240 let result = self
241 .json_rpc_with_session(
242 access_token,
243 "tools/list",
244 json!({}),
245 Some("list-tools"),
246 true,
247 )
248 .await?;
249 parse_tools_list_result(&result)
250 }
251
252 pub async fn call_tool(
253 &self,
254 tool_id: &str,
255 args: Option<serde_json::Map<String, serde_json::Value>>,
256 ) -> Result<serde_json::Value, KontextDevError> {
257 let session = self.authenticate_mcp().await?;
258 self.call_tool_with_access_token(&session.gateway_token.access_token, tool_id, args)
259 .await
260 }
261
262 pub async fn call_tool_with_access_token(
263 &self,
264 access_token: &str,
265 tool_id: &str,
266 args: Option<serde_json::Map<String, serde_json::Value>>,
267 ) -> Result<serde_json::Value, KontextDevError> {
268 self.json_rpc_with_session(
269 access_token,
270 "tools/call",
271 json!({ "name": tool_id, "arguments": args.unwrap_or_default() }),
272 Some("call-tool"),
273 true,
274 )
275 .await
276 }
277
278 async fn json_rpc_with_session(
279 &self,
280 access_token: &str,
281 method: &str,
282 params: Value,
283 id: Option<&str>,
284 allow_session_retry: bool,
285 ) -> Result<Value, KontextDevError> {
286 let max_attempts = if allow_session_retry { 2 } else { 1 };
287 for attempt in 0..max_attempts {
288 let session_id = self.ensure_mcp_session(access_token).await?;
289
290 let response = self
291 .http
292 .post(self.mcp_url()?)
293 .bearer_auth(access_token)
294 .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
295 .header(MCP_SESSION_HEADER, &session_id)
296 .json(&json!({
297 "jsonrpc": "2.0",
298 "id": id.unwrap_or("1"),
299 "method": method,
300 "params": params,
301 }))
302 .send()
303 .await
304 .map_err(|err| KontextDevError::ConnectSession {
305 message: err.to_string(),
306 })?;
307
308 if !response.status().is_success() {
309 let status = response.status();
310 let body = response.text().await.unwrap_or_default();
311 let retryable = attempt + 1 < max_attempts && is_invalid_session_error(&body);
312 if retryable {
313 self.invalidate_session().await;
314 continue;
315 }
316 return Err(KontextDevError::ConnectSession {
317 message: format!("{status}: {body}"),
318 });
319 }
320
321 let payload = parse_json_or_streamable_response(response).await?;
322
323 if let Some(error) = payload.get("error") {
324 let message = extract_jsonrpc_error_message(error);
325 let retryable =
326 attempt + 1 < max_attempts && is_invalid_session_error(message.as_str());
327 if retryable {
328 self.invalidate_session().await;
329 continue;
330 }
331 return Err(KontextDevError::ConnectSession { message });
332 }
333
334 return Ok(payload.get("result").cloned().unwrap_or(Value::Null));
335 }
336
337 Err(KontextDevError::ConnectSession {
338 message: "MCP request failed after session retry".to_string(),
339 })
340 }
341
342 async fn ensure_mcp_session(&self, access_token: &str) -> Result<String, KontextDevError> {
343 {
344 let guard = self.session.lock().await;
345 if guard.access_token.as_deref() == Some(access_token)
346 && let Some(session_id) = guard.session_id.clone()
347 {
348 return Ok(session_id);
349 }
350 }
351
352 let initialize_response = self
353 .http
354 .post(self.mcp_url()?)
355 .bearer_auth(access_token)
356 .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
357 .json(&json!({
358 "jsonrpc": "2.0",
359 "id": "initialize",
360 "method": "initialize",
361 "params": {
362 "protocolVersion": DEFAULT_MCP_PROTOCOL_VERSION,
363 "capabilities": {
364 "tools": {}
365 },
366 "clientInfo": {
367 "name": "kontext-dev-sdk-rs",
368 "version": env!("CARGO_PKG_VERSION")
369 }
370 }
371 }))
372 .send()
373 .await
374 .map_err(|err| KontextDevError::ConnectSession {
375 message: err.to_string(),
376 })?;
377
378 if !initialize_response.status().is_success() {
379 let status = initialize_response.status();
380 let body = initialize_response.text().await.unwrap_or_default();
381 return Err(KontextDevError::ConnectSession {
382 message: format!("{status}: {body}"),
383 });
384 }
385
386 let session_header = initialize_response
387 .headers()
388 .get(MCP_SESSION_HEADER)
389 .or_else(|| initialize_response.headers().get("mcp-session-id"))
390 .and_then(|value| value.to_str().ok())
391 .map(|value| value.trim().to_string());
392
393 let initialize_payload = parse_json_or_streamable_response(initialize_response).await?;
394
395 if let Some(error) = initialize_payload.get("error") {
396 return Err(KontextDevError::ConnectSession {
397 message: extract_jsonrpc_error_message(error),
398 });
399 }
400
401 let session_id = session_header
402 .or_else(|| {
403 initialize_payload
404 .get("result")
405 .and_then(|result| result.get("sessionId"))
406 .and_then(|value| value.as_str())
407 .map(|value| value.to_string())
408 })
409 .or_else(|| {
410 initialize_payload
411 .get("result")
412 .and_then(|result| result.get("session_id"))
413 .and_then(|value| value.as_str())
414 .map(|value| value.to_string())
415 })
416 .ok_or_else(|| KontextDevError::ConnectSession {
417 message: "MCP initialize did not return a session id".to_string(),
418 })?;
419
420 let _ = self
423 .http
424 .post(self.mcp_url()?)
425 .bearer_auth(access_token)
426 .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
427 .header(MCP_SESSION_HEADER, &session_id)
428 .json(&json!({
429 "jsonrpc": "2.0",
430 "method": "notifications/initialized",
431 "params": {}
432 }))
433 .send()
434 .await;
435
436 {
437 let mut guard = self.session.lock().await;
438 guard.session_id = Some(session_id.clone());
439 guard.access_token = Some(access_token.to_string());
440 }
441
442 Ok(session_id)
443 }
444
445 async fn invalidate_session(&self) {
446 let mut guard = self.session.lock().await;
447 guard.session_id = None;
448 guard.access_token = None;
449 }
450}
451
452fn extract_jsonrpc_error_message(error: &Value) -> String {
453 error
454 .get("message")
455 .and_then(|value| value.as_str())
456 .map(ToString::to_string)
457 .or_else(|| {
458 error
459 .get("error_description")
460 .and_then(|value| value.as_str())
461 .map(ToString::to_string)
462 })
463 .unwrap_or_else(|| error.to_string())
464}
465
466fn is_invalid_session_error(message: &str) -> bool {
467 let lower = message.to_ascii_lowercase();
468 lower.contains("no valid session id")
469 || lower.contains("no valid session-id")
470 || lower.contains("invalid session")
471}
472
473async fn parse_json_or_streamable_response(
474 response: reqwest::Response,
475) -> Result<Value, KontextDevError> {
476 let content_type = response
477 .headers()
478 .get(reqwest::header::CONTENT_TYPE)
479 .and_then(|value| value.to_str().ok())
480 .map(|value| value.to_ascii_lowercase())
481 .unwrap_or_default();
482 let body = response
483 .text()
484 .await
485 .map_err(|err| KontextDevError::ConnectSession {
486 message: err.to_string(),
487 })?;
488
489 parse_json_or_streamable_body(&body, &content_type)
490 .map_err(|message| KontextDevError::ConnectSession { message })
491}
492
493fn parse_json_or_streamable_body(body: &str, content_type: &str) -> Result<Value, String> {
494 let parse_json = || serde_json::from_str::<Value>(body).map_err(|err| err.to_string());
495 let parse_sse = || parse_sse_last_json_event(body);
496
497 if content_type.contains(STREAM_CONTENT_TYPE) {
498 return parse_sse().ok_or_else(|| {
499 "failed to parse streamable MCP response as SSE JSON events".to_string()
500 });
501 }
502
503 parse_json().or_else(|json_err| {
504 parse_sse().ok_or_else(|| format!("failed to decode response body: {json_err}"))
505 })
506}
507
508fn parse_sse_last_json_event(body: &str) -> Option<Value> {
509 let mut current_data = Vec::<String>::new();
510 let mut last_json = None;
511
512 let flush_data = |current_data: &mut Vec<String>, last_json: &mut Option<Value>| {
513 if current_data.is_empty() {
514 return;
515 }
516 let data = current_data.join("\n");
517 current_data.clear();
518 let trimmed = data.trim();
519 if trimmed.is_empty() || trimmed == "[DONE]" {
520 return;
521 }
522 if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
523 *last_json = Some(value);
524 }
525 };
526
527 for line in body.lines() {
528 let line = line.trim_end_matches('\r');
529 if line.is_empty() {
530 flush_data(&mut current_data, &mut last_json);
531 continue;
532 }
533 if let Some(data) = line.strip_prefix("data:") {
534 current_data.push(data.trim_start().to_string());
535 continue;
536 }
537 if let Ok(value) = serde_json::from_str::<Value>(line) {
538 last_json = Some(value);
539 }
540 }
541 flush_data(&mut current_data, &mut last_json);
542
543 last_json
544}
545
546pub(crate) fn has_meta_gateway_tools(tools: &[KontextTool]) -> bool {
547 let mut has_search = false;
548 let mut has_execute = false;
549 for tool in tools {
550 if tool.name == META_SEARCH_TOOLS {
551 has_search = true;
552 } else if tool.name == META_EXECUTE_TOOL {
553 has_execute = true;
554 }
555 }
556 has_search && has_execute
557}
558
559pub(crate) fn extract_json_resource_text(result: &Value) -> Option<String> {
560 let content = result.get("content")?.as_array()?;
561 for item in content {
562 if item.get("type").and_then(Value::as_str) != Some("resource") {
563 continue;
564 }
565 let Some(resource) = item.get("resource") else {
566 continue;
567 };
568 if resource.get("mimeType").and_then(Value::as_str) != Some("application/json") {
569 continue;
570 }
571 if let Some(text) = resource.get("text").and_then(Value::as_str) {
572 return Some(text.to_string());
573 }
574 }
575 None
576}
577
578pub(crate) fn extract_text_content(result: &Value) -> String {
579 let Some(content) = result.get("content").and_then(Value::as_array) else {
580 return result.to_string();
581 };
582
583 let mut text_items = Vec::new();
584 for item in content {
585 if item.get("type").and_then(Value::as_str) == Some("text")
586 && let Some(text) = item.get("text").and_then(Value::as_str)
587 {
588 text_items.push(text.to_string());
589 }
590 }
591 if !text_items.is_empty() {
592 return text_items.join("\n");
593 }
594
595 let mut resource_items = Vec::new();
596 for item in content {
597 if item.get("type").and_then(Value::as_str) != Some("resource") {
598 continue;
599 }
600 let Some(resource_text) = item
601 .get("resource")
602 .and_then(|resource| resource.get("text"))
603 .and_then(Value::as_str)
604 else {
605 continue;
606 };
607
608 let parsed = serde_json::from_str::<Value>(resource_text)
609 .ok()
610 .map(|value| extract_text_content(&value))
611 .unwrap_or_else(|| resource_text.to_string());
612 resource_items.push(parsed);
613 }
614
615 if !resource_items.is_empty() {
616 return resource_items.join("\n");
617 }
618
619 content
620 .iter()
621 .map(Value::to_string)
622 .collect::<Vec<_>>()
623 .join("\n")
624}
625
626#[derive(Clone, Debug)]
627pub(crate) struct GatewayToolsPayload {
628 pub tools: Vec<KontextTool>,
629 pub errors: Vec<GatewayToolError>,
630 pub elicitations: Vec<GatewayElicitation>,
631}
632
633#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
634#[serde(rename_all = "camelCase")]
635pub struct GatewayToolError {
636 pub server_id: String,
637 #[serde(default)]
638 pub server_name: Option<String>,
639 #[serde(default)]
640 pub reason: Option<String>,
641}
642
643#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
644#[serde(rename_all = "camelCase")]
645pub struct GatewayElicitation {
646 pub url: String,
647 #[serde(default)]
648 pub message: Option<String>,
649 #[serde(default)]
650 pub integration_id: Option<String>,
651 #[serde(default)]
652 pub integration_name: Option<String>,
653}
654
655#[derive(Clone, Debug, Deserialize)]
656#[serde(rename_all = "camelCase")]
657struct GatewayToolSummary {
658 id: String,
659 name: String,
660 #[serde(default)]
661 description: Option<String>,
662 #[serde(default)]
663 input_schema: Option<Value>,
664 #[serde(default)]
665 server: Option<GatewayToolServer>,
666}
667
668#[derive(Clone, Debug, Deserialize)]
669#[serde(rename_all = "camelCase")]
670struct GatewayToolServer {
671 #[serde(default)]
672 id: Option<String>,
673 #[serde(default)]
674 name: Option<String>,
675}
676
677#[derive(Debug, Deserialize)]
678#[serde(rename_all = "camelCase")]
679struct RawTool {
680 name: String,
681 #[serde(default)]
682 description: Option<String>,
683 #[serde(default)]
684 input_schema: Option<serde_json::Value>,
685}
686
687fn parse_tools_list_result(result: &Value) -> Result<Vec<KontextTool>, KontextDevError> {
688 let tools = result
689 .get("tools")
690 .and_then(|value| value.as_array())
691 .cloned()
692 .unwrap_or_default();
693
694 tools
695 .into_iter()
696 .map(|tool| {
697 let raw: RawTool =
698 serde_json::from_value(tool).map_err(|err| KontextDevError::ConnectSession {
699 message: format!("invalid tool payload: {err}"),
700 })?;
701
702 Ok(KontextTool {
703 id: raw.name.clone(),
704 name: raw.name,
705 description: raw.description,
706 input_schema: raw.input_schema,
707 server: None,
708 })
709 })
710 .collect()
711}
712
713pub(crate) fn parse_gateway_tools_payload(
714 raw: &Value,
715) -> Result<GatewayToolsPayload, KontextDevError> {
716 let json_text =
717 extract_json_resource_text(raw).ok_or_else(|| KontextDevError::ConnectSession {
718 message: "SEARCH_TOOLS did not return JSON resource content".to_string(),
719 })?;
720
721 let parsed = serde_json::from_str::<Value>(&json_text).map_err(|err| {
722 KontextDevError::ConnectSession {
723 message: format!("SEARCH_TOOLS returned invalid JSON: {err}"),
724 }
725 })?;
726
727 if let Some(items) = parsed.as_array() {
728 let tools = items
729 .iter()
730 .cloned()
731 .map(serde_json::from_value::<GatewayToolSummary>)
732 .collect::<Result<Vec<_>, _>>()
733 .map_err(|err| KontextDevError::ConnectSession {
734 message: format!("SEARCH_TOOLS returned invalid tool entry: {err}"),
735 })?
736 .into_iter()
737 .map(to_kontext_gateway_tool)
738 .collect();
739 return Ok(GatewayToolsPayload {
740 tools,
741 errors: Vec::new(),
742 elicitations: Vec::new(),
743 });
744 }
745
746 let Some(obj) = parsed.as_object() else {
747 return Err(KontextDevError::ConnectSession {
748 message: "SEARCH_TOOLS response was not a JSON array or object".to_string(),
749 });
750 };
751
752 let tools = obj
753 .get("items")
754 .and_then(Value::as_array)
755 .cloned()
756 .unwrap_or_default()
757 .into_iter()
758 .map(serde_json::from_value::<GatewayToolSummary>)
759 .collect::<Result<Vec<_>, _>>()
760 .map_err(|err| KontextDevError::ConnectSession {
761 message: format!("SEARCH_TOOLS items contained invalid tool data: {err}"),
762 })?
763 .into_iter()
764 .map(to_kontext_gateway_tool)
765 .collect::<Vec<_>>();
766
767 let errors = obj
768 .get("errors")
769 .and_then(Value::as_array)
770 .cloned()
771 .unwrap_or_default()
772 .into_iter()
773 .filter_map(|value| serde_json::from_value::<GatewayToolError>(value).ok())
774 .collect::<Vec<_>>();
775
776 let elicitations = obj
777 .get("elicitations")
778 .and_then(Value::as_array)
779 .cloned()
780 .unwrap_or_default()
781 .into_iter()
782 .filter_map(|value| serde_json::from_value::<GatewayElicitation>(value).ok())
783 .collect::<Vec<_>>();
784
785 Ok(GatewayToolsPayload {
786 tools,
787 errors,
788 elicitations,
789 })
790}
791
792fn to_kontext_gateway_tool(summary: GatewayToolSummary) -> KontextTool {
793 let server = summary.server.and_then(|server| {
794 server.id.map(|id| KontextToolServer {
795 id,
796 name: server.name,
797 })
798 });
799
800 KontextTool {
801 id: summary.id,
802 name: summary.name,
803 description: summary.description,
804 input_schema: summary.input_schema,
805 server,
806 }
807}
808
809#[cfg(test)]
810mod tests {
811 use super::*;
812
813 #[test]
814 fn parse_json_or_streamable_body_parses_json_payload() {
815 let parsed = parse_json_or_streamable_body(
816 r#"{"jsonrpc":"2.0","result":{"ok":true}}"#,
817 "application/json",
818 )
819 .expect("json should parse");
820 assert_eq!(parsed["result"]["ok"], Value::Bool(true));
821 }
822
823 #[test]
824 fn parse_json_or_streamable_body_parses_sse_payload() {
825 let parsed = parse_json_or_streamable_body(
826 "event: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{\"sessionId\":\"abc\"}}\n\n",
827 "text/event-stream",
828 )
829 .expect("sse should parse");
830 assert_eq!(
831 parsed["result"]["sessionId"],
832 Value::String("abc".to_string())
833 );
834 }
835
836 #[test]
837 fn parse_json_or_streamable_body_falls_back_to_sse_when_content_type_is_json() {
838 let parsed = parse_json_or_streamable_body(
839 "data: {\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[]}}\n\n",
840 "application/json",
841 )
842 .expect("sse fallback should parse");
843 assert_eq!(parsed["result"]["tools"], Value::Array(Vec::new()));
844 }
845
846 #[test]
847 fn raw_tool_parses_input_schema_from_camel_case_key() {
848 let parsed: RawTool = serde_json::from_value(serde_json::json!({
849 "name": "SEARCH_TOOLS",
850 "description": "Search available tools",
851 "inputSchema": { "type": "object", "properties": { "limit": { "type": "number" } } }
852 }))
853 .expect("raw tool should deserialize");
854
855 assert_eq!(parsed.name, "SEARCH_TOOLS");
856 assert_eq!(
857 parsed
858 .input_schema
859 .as_ref()
860 .and_then(|value| value.get("type"))
861 .and_then(Value::as_str),
862 Some("object")
863 );
864 }
865
866 #[test]
867 fn extract_json_resource_text_skips_resource_items_without_resource_payload() {
868 let payload = serde_json::json!({
869 "content": [
870 { "type": "resource" },
871 {
872 "type": "resource",
873 "resource": {
874 "mimeType": "application/json",
875 "text": "{\"ok\":true}"
876 }
877 }
878 ]
879 });
880
881 assert_eq!(
882 extract_json_resource_text(&payload),
883 Some("{\"ok\":true}".to_string())
884 );
885 }
886}