Skip to main content

kontext_dev_sdk/
mcp.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tokio::sync::Mutex;
6
7use crate::KontextAuthSession;
8use crate::KontextDevClient;
9use crate::KontextDevConfig;
10use crate::KontextDevError;
11
12pub const DEFAULT_SERVER: &str = "https://api.kontext.dev";
13const MCP_SESSION_HEADER: &str = "Mcp-Session-Id";
14const META_SEARCH_TOOLS: &str = "SEARCH_TOOLS";
15const META_EXECUTE_TOOL: &str = "EXECUTE_TOOL";
16const DEFAULT_MCP_PROTOCOL_VERSION: &str = "2025-06-18";
17const STREAMABLE_HTTP_ACCEPT: &str = "application/json, text/event-stream";
18const STREAM_CONTENT_TYPE: &str = "text/event-stream";
19
20pub fn normalize_kontext_server_url(server: &str) -> String {
21    let mut url = server.trim_end_matches('/').to_string();
22    if let Some(stripped) = url.strip_suffix("/api/v1") {
23        url = stripped.to_string();
24    }
25    if let Some(stripped) = url.strip_suffix("/mcp") {
26        url = stripped.to_string();
27    }
28    url.trim_end_matches('/').to_string()
29}
30
31#[derive(Clone, Debug)]
32pub struct KontextMcpConfig {
33    pub client_id: String,
34    pub redirect_uri: String,
35    pub url: Option<String>,
36    pub server: Option<String>,
37    pub client_secret: Option<String>,
38    pub scope: Option<String>,
39    pub resource: Option<String>,
40    pub session_key: Option<String>,
41    pub integration_ui_url: Option<String>,
42    pub integration_return_to: Option<String>,
43    pub auth_timeout_seconds: Option<i64>,
44    pub open_connect_page_on_login: Option<bool>,
45    pub token_cache_path: Option<String>,
46}
47
48impl Default for KontextMcpConfig {
49    fn default() -> Self {
50        Self {
51            client_id: String::new(),
52            redirect_uri: "http://localhost:3333/callback".to_string(),
53            url: None,
54            server: Some(DEFAULT_SERVER.to_string()),
55            client_secret: None,
56            scope: None,
57            resource: None,
58            session_key: None,
59            integration_ui_url: None,
60            integration_return_to: None,
61            auth_timeout_seconds: None,
62            open_connect_page_on_login: None,
63            token_cache_path: None,
64        }
65    }
66}
67
68#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
69#[serde(rename_all = "snake_case")]
70pub enum RuntimeIntegrationCategory {
71    GatewayRemoteMcp,
72    InternalMcpCredentials,
73}
74
75#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
76#[serde(rename_all = "snake_case")]
77pub enum RuntimeIntegrationConnectType {
78    Oauth,
79    Credentials,
80    None,
81}
82
83#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
84#[serde(rename_all = "camelCase")]
85pub struct RuntimeIntegrationRecord {
86    pub id: String,
87    pub name: String,
88    pub url: String,
89    pub category: RuntimeIntegrationCategory,
90    pub connect_type: RuntimeIntegrationConnectType,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub auth_mode: Option<String>,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub credential_schema: Option<serde_json::Value>,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub requires_oauth: Option<bool>,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub connection: Option<RuntimeIntegrationConnection>,
99}
100
101#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
102#[serde(rename_all = "camelCase")]
103pub struct RuntimeIntegrationConnection {
104    pub connected: bool,
105    pub status: String,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub expires_at: Option<String>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub display_name: Option<String>,
110}
111
112#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
113#[serde(rename_all = "camelCase")]
114pub struct KontextTool {
115    pub id: String,
116    pub name: String,
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub description: Option<String>,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub input_schema: Option<serde_json::Value>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub server: Option<KontextToolServer>,
123}
124
125#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
126#[serde(rename_all = "camelCase")]
127pub struct KontextToolServer {
128    pub id: String,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub name: Option<String>,
131}
132
133#[derive(Clone, Debug, Default)]
134struct McpSessionState {
135    session_id: Option<String>,
136    access_token: Option<String>,
137}
138
139#[derive(Clone, Debug)]
140pub struct KontextMcp {
141    config: KontextMcpConfig,
142    client: KontextDevClient,
143    http: reqwest::Client,
144    session: Arc<Mutex<McpSessionState>>,
145}
146
147impl KontextMcp {
148    pub fn new(config: KontextMcpConfig) -> Self {
149        let server =
150            normalize_kontext_server_url(config.server.as_deref().unwrap_or(DEFAULT_SERVER));
151        let sdk_config = KontextDevConfig {
152            server,
153            client_id: config.client_id.clone(),
154            client_secret: config.client_secret.clone(),
155            scope: config.scope.clone().unwrap_or_default(),
156            server_name: "kontext-dev".to_string(),
157            resource: config
158                .resource
159                .clone()
160                .unwrap_or_else(|| "mcp-gateway".to_string()),
161            integration_ui_url: config.integration_ui_url.clone(),
162            integration_return_to: config.integration_return_to.clone(),
163            open_connect_page_on_login: config.open_connect_page_on_login.unwrap_or(true),
164            auth_timeout_seconds: config.auth_timeout_seconds.unwrap_or(300),
165            token_cache_path: config.token_cache_path.clone(),
166            redirect_uri: config.redirect_uri.clone(),
167        };
168
169        Self {
170            config,
171            client: KontextDevClient::new(sdk_config),
172            http: reqwest::Client::new(),
173            session: Arc::new(Mutex::new(McpSessionState::default())),
174        }
175    }
176
177    pub fn client(&self) -> &KontextDevClient {
178        &self.client
179    }
180
181    pub async fn authenticate_mcp(&self) -> Result<KontextAuthSession, KontextDevError> {
182        self.client.authenticate_mcp().await
183    }
184
185    pub fn mcp_url(&self) -> Result<String, KontextDevError> {
186        if let Some(url) = &self.config.url {
187            return Ok(url.clone());
188        }
189        self.client.mcp_url()
190    }
191
192    pub async fn list_integrations(
193        &self,
194    ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
195        let session = self.authenticate_mcp().await?;
196        let base = self.client.server_base_url()?;
197        let response = self
198            .http
199            .get(format!("{}/mcp/integrations", base.trim_end_matches('/')))
200            .bearer_auth(session.gateway_token.access_token)
201            .send()
202            .await
203            .map_err(|err| KontextDevError::ConnectSession {
204                message: err.to_string(),
205            })?;
206
207        if !response.status().is_success() {
208            let status = response.status();
209            let body = response.text().await.unwrap_or_default();
210            return Err(KontextDevError::ConnectSession {
211                message: format!("{status}: {body}"),
212            });
213        }
214
215        #[derive(Deserialize)]
216        struct IntegrationsResponse {
217            items: Vec<RuntimeIntegrationRecord>,
218        }
219
220        let payload = response
221            .json::<IntegrationsResponse>()
222            .await
223            .map_err(|err| KontextDevError::ConnectSession {
224                message: err.to_string(),
225            })?;
226
227        Ok(payload.items)
228    }
229
230    pub async fn list_tools(&self) -> Result<Vec<KontextTool>, KontextDevError> {
231        let session = self.authenticate_mcp().await?;
232        self.list_tools_with_access_token(&session.gateway_token.access_token)
233            .await
234    }
235
236    pub async fn list_tools_with_access_token(
237        &self,
238        access_token: &str,
239    ) -> Result<Vec<KontextTool>, KontextDevError> {
240        let result = self
241            .json_rpc_with_session(
242                access_token,
243                "tools/list",
244                json!({}),
245                Some("list-tools"),
246                true,
247            )
248            .await?;
249        parse_tools_list_result(&result)
250    }
251
252    pub async fn call_tool(
253        &self,
254        tool_id: &str,
255        args: Option<serde_json::Map<String, serde_json::Value>>,
256    ) -> Result<serde_json::Value, KontextDevError> {
257        let session = self.authenticate_mcp().await?;
258        self.call_tool_with_access_token(&session.gateway_token.access_token, tool_id, args)
259            .await
260    }
261
262    pub async fn call_tool_with_access_token(
263        &self,
264        access_token: &str,
265        tool_id: &str,
266        args: Option<serde_json::Map<String, serde_json::Value>>,
267    ) -> Result<serde_json::Value, KontextDevError> {
268        self.json_rpc_with_session(
269            access_token,
270            "tools/call",
271            json!({ "name": tool_id, "arguments": args.unwrap_or_default() }),
272            Some("call-tool"),
273            true,
274        )
275        .await
276    }
277
278    async fn json_rpc_with_session(
279        &self,
280        access_token: &str,
281        method: &str,
282        params: Value,
283        id: Option<&str>,
284        allow_session_retry: bool,
285    ) -> Result<Value, KontextDevError> {
286        let max_attempts = if allow_session_retry { 2 } else { 1 };
287        for attempt in 0..max_attempts {
288            let session_id = self.ensure_mcp_session(access_token).await?;
289
290            let response = self
291                .http
292                .post(self.mcp_url()?)
293                .bearer_auth(access_token)
294                .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
295                .header(MCP_SESSION_HEADER, &session_id)
296                .json(&json!({
297                    "jsonrpc": "2.0",
298                    "id": id.unwrap_or("1"),
299                    "method": method,
300                    "params": params,
301                }))
302                .send()
303                .await
304                .map_err(|err| KontextDevError::ConnectSession {
305                    message: err.to_string(),
306                })?;
307
308            if !response.status().is_success() {
309                let status = response.status();
310                let body = response.text().await.unwrap_or_default();
311                let retryable = attempt + 1 < max_attempts && is_invalid_session_error(&body);
312                if retryable {
313                    self.invalidate_session().await;
314                    continue;
315                }
316                return Err(KontextDevError::ConnectSession {
317                    message: format!("{status}: {body}"),
318                });
319            }
320
321            let payload = parse_json_or_streamable_response(response).await?;
322
323            if let Some(error) = payload.get("error") {
324                let message = extract_jsonrpc_error_message(error);
325                let retryable =
326                    attempt + 1 < max_attempts && is_invalid_session_error(message.as_str());
327                if retryable {
328                    self.invalidate_session().await;
329                    continue;
330                }
331                return Err(KontextDevError::ConnectSession { message });
332            }
333
334            return Ok(payload.get("result").cloned().unwrap_or(Value::Null));
335        }
336
337        Err(KontextDevError::ConnectSession {
338            message: "MCP request failed after session retry".to_string(),
339        })
340    }
341
342    async fn ensure_mcp_session(&self, access_token: &str) -> Result<String, KontextDevError> {
343        {
344            let guard = self.session.lock().await;
345            if guard.access_token.as_deref() == Some(access_token)
346                && let Some(session_id) = guard.session_id.clone()
347            {
348                return Ok(session_id);
349            }
350        }
351
352        let initialize_response = self
353            .http
354            .post(self.mcp_url()?)
355            .bearer_auth(access_token)
356            .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
357            .json(&json!({
358                "jsonrpc": "2.0",
359                "id": "initialize",
360                "method": "initialize",
361                "params": {
362                    "protocolVersion": DEFAULT_MCP_PROTOCOL_VERSION,
363                    "capabilities": {
364                        "tools": {}
365                    },
366                    "clientInfo": {
367                        "name": "kontext-dev-sdk-rs",
368                        "version": env!("CARGO_PKG_VERSION")
369                    }
370                }
371            }))
372            .send()
373            .await
374            .map_err(|err| KontextDevError::ConnectSession {
375                message: err.to_string(),
376            })?;
377
378        if !initialize_response.status().is_success() {
379            let status = initialize_response.status();
380            let body = initialize_response.text().await.unwrap_or_default();
381            return Err(KontextDevError::ConnectSession {
382                message: format!("{status}: {body}"),
383            });
384        }
385
386        let session_header = initialize_response
387            .headers()
388            .get(MCP_SESSION_HEADER)
389            .or_else(|| initialize_response.headers().get("mcp-session-id"))
390            .and_then(|value| value.to_str().ok())
391            .map(|value| value.trim().to_string());
392
393        let initialize_payload = parse_json_or_streamable_response(initialize_response).await?;
394
395        if let Some(error) = initialize_payload.get("error") {
396            return Err(KontextDevError::ConnectSession {
397                message: extract_jsonrpc_error_message(error),
398            });
399        }
400
401        let session_id = session_header
402            .or_else(|| {
403                initialize_payload
404                    .get("result")
405                    .and_then(|result| result.get("sessionId"))
406                    .and_then(|value| value.as_str())
407                    .map(|value| value.to_string())
408            })
409            .or_else(|| {
410                initialize_payload
411                    .get("result")
412                    .and_then(|result| result.get("session_id"))
413                    .and_then(|value| value.as_str())
414                    .map(|value| value.to_string())
415            })
416            .ok_or_else(|| KontextDevError::ConnectSession {
417                message: "MCP initialize did not return a session id".to_string(),
418            })?;
419
420        // Best-effort initialized notification. Many servers accept requests
421        // without it, but send it to follow the Streamable HTTP MCP handshake.
422        let _ = self
423            .http
424            .post(self.mcp_url()?)
425            .bearer_auth(access_token)
426            .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
427            .header(MCP_SESSION_HEADER, &session_id)
428            .json(&json!({
429                "jsonrpc": "2.0",
430                "method": "notifications/initialized",
431                "params": {}
432            }))
433            .send()
434            .await;
435
436        {
437            let mut guard = self.session.lock().await;
438            guard.session_id = Some(session_id.clone());
439            guard.access_token = Some(access_token.to_string());
440        }
441
442        Ok(session_id)
443    }
444
445    async fn invalidate_session(&self) {
446        let mut guard = self.session.lock().await;
447        guard.session_id = None;
448        guard.access_token = None;
449    }
450}
451
452fn extract_jsonrpc_error_message(error: &Value) -> String {
453    error
454        .get("message")
455        .and_then(|value| value.as_str())
456        .map(ToString::to_string)
457        .or_else(|| {
458            error
459                .get("error_description")
460                .and_then(|value| value.as_str())
461                .map(ToString::to_string)
462        })
463        .unwrap_or_else(|| error.to_string())
464}
465
466fn is_invalid_session_error(message: &str) -> bool {
467    let lower = message.to_ascii_lowercase();
468    lower.contains("no valid session id")
469        || lower.contains("no valid session-id")
470        || lower.contains("invalid session")
471}
472
473async fn parse_json_or_streamable_response(
474    response: reqwest::Response,
475) -> Result<Value, KontextDevError> {
476    let content_type = response
477        .headers()
478        .get(reqwest::header::CONTENT_TYPE)
479        .and_then(|value| value.to_str().ok())
480        .map(|value| value.to_ascii_lowercase())
481        .unwrap_or_default();
482    let body = response
483        .text()
484        .await
485        .map_err(|err| KontextDevError::ConnectSession {
486            message: err.to_string(),
487        })?;
488
489    parse_json_or_streamable_body(&body, &content_type)
490        .map_err(|message| KontextDevError::ConnectSession { message })
491}
492
493fn parse_json_or_streamable_body(body: &str, content_type: &str) -> Result<Value, String> {
494    let parse_json = || serde_json::from_str::<Value>(body).map_err(|err| err.to_string());
495    let parse_sse = || parse_sse_last_json_event(body);
496
497    if content_type.contains(STREAM_CONTENT_TYPE) {
498        return parse_sse().ok_or_else(|| {
499            "failed to parse streamable MCP response as SSE JSON events".to_string()
500        });
501    }
502
503    parse_json().or_else(|json_err| {
504        parse_sse().ok_or_else(|| format!("failed to decode response body: {json_err}"))
505    })
506}
507
508fn parse_sse_last_json_event(body: &str) -> Option<Value> {
509    let mut current_data = Vec::<String>::new();
510    let mut last_json = None;
511
512    let flush_data = |current_data: &mut Vec<String>, last_json: &mut Option<Value>| {
513        if current_data.is_empty() {
514            return;
515        }
516        let data = current_data.join("\n");
517        current_data.clear();
518        let trimmed = data.trim();
519        if trimmed.is_empty() || trimmed == "[DONE]" {
520            return;
521        }
522        if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
523            *last_json = Some(value);
524        }
525    };
526
527    for line in body.lines() {
528        let line = line.trim_end_matches('\r');
529        if line.is_empty() {
530            flush_data(&mut current_data, &mut last_json);
531            continue;
532        }
533        if let Some(data) = line.strip_prefix("data:") {
534            current_data.push(data.trim_start().to_string());
535            continue;
536        }
537        if let Ok(value) = serde_json::from_str::<Value>(line) {
538            last_json = Some(value);
539        }
540    }
541    flush_data(&mut current_data, &mut last_json);
542
543    last_json
544}
545
546pub(crate) fn has_meta_gateway_tools(tools: &[KontextTool]) -> bool {
547    let mut has_search = false;
548    let mut has_execute = false;
549    for tool in tools {
550        if tool.name == META_SEARCH_TOOLS {
551            has_search = true;
552        } else if tool.name == META_EXECUTE_TOOL {
553            has_execute = true;
554        }
555    }
556    has_search && has_execute
557}
558
559pub(crate) fn extract_json_resource_text(result: &Value) -> Option<String> {
560    let content = result.get("content")?.as_array()?;
561    for item in content {
562        if item.get("type").and_then(Value::as_str) != Some("resource") {
563            continue;
564        }
565        let Some(resource) = item.get("resource") else {
566            continue;
567        };
568        if resource.get("mimeType").and_then(Value::as_str) != Some("application/json") {
569            continue;
570        }
571        if let Some(text) = resource.get("text").and_then(Value::as_str) {
572            return Some(text.to_string());
573        }
574    }
575    None
576}
577
578pub(crate) fn extract_text_content(result: &Value) -> String {
579    let Some(content) = result.get("content").and_then(Value::as_array) else {
580        return result.to_string();
581    };
582
583    let mut text_items = Vec::new();
584    for item in content {
585        if item.get("type").and_then(Value::as_str) == Some("text")
586            && let Some(text) = item.get("text").and_then(Value::as_str)
587        {
588            text_items.push(text.to_string());
589        }
590    }
591    if !text_items.is_empty() {
592        return text_items.join("\n");
593    }
594
595    let mut resource_items = Vec::new();
596    for item in content {
597        if item.get("type").and_then(Value::as_str) != Some("resource") {
598            continue;
599        }
600        let Some(resource_text) = item
601            .get("resource")
602            .and_then(|resource| resource.get("text"))
603            .and_then(Value::as_str)
604        else {
605            continue;
606        };
607
608        let parsed = serde_json::from_str::<Value>(resource_text)
609            .ok()
610            .map(|value| extract_text_content(&value))
611            .unwrap_or_else(|| resource_text.to_string());
612        resource_items.push(parsed);
613    }
614
615    if !resource_items.is_empty() {
616        return resource_items.join("\n");
617    }
618
619    content
620        .iter()
621        .map(Value::to_string)
622        .collect::<Vec<_>>()
623        .join("\n")
624}
625
626#[derive(Clone, Debug)]
627pub(crate) struct GatewayToolsPayload {
628    pub tools: Vec<KontextTool>,
629    pub errors: Vec<GatewayToolError>,
630    pub elicitations: Vec<GatewayElicitation>,
631}
632
633#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
634#[serde(rename_all = "camelCase")]
635pub struct GatewayToolError {
636    pub server_id: String,
637    #[serde(default)]
638    pub server_name: Option<String>,
639    #[serde(default)]
640    pub reason: Option<String>,
641}
642
643#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
644#[serde(rename_all = "camelCase")]
645pub struct GatewayElicitation {
646    pub url: String,
647    #[serde(default)]
648    pub message: Option<String>,
649    #[serde(default)]
650    pub integration_id: Option<String>,
651    #[serde(default)]
652    pub integration_name: Option<String>,
653}
654
655#[derive(Clone, Debug, Deserialize)]
656#[serde(rename_all = "camelCase")]
657struct GatewayToolSummary {
658    id: String,
659    name: String,
660    #[serde(default)]
661    description: Option<String>,
662    #[serde(default)]
663    input_schema: Option<Value>,
664    #[serde(default)]
665    server: Option<GatewayToolServer>,
666}
667
668#[derive(Clone, Debug, Deserialize)]
669#[serde(rename_all = "camelCase")]
670struct GatewayToolServer {
671    #[serde(default)]
672    id: Option<String>,
673    #[serde(default)]
674    name: Option<String>,
675}
676
677#[derive(Debug, Deserialize)]
678#[serde(rename_all = "camelCase")]
679struct RawTool {
680    name: String,
681    #[serde(default)]
682    description: Option<String>,
683    #[serde(default)]
684    input_schema: Option<serde_json::Value>,
685}
686
687fn parse_tools_list_result(result: &Value) -> Result<Vec<KontextTool>, KontextDevError> {
688    let tools = result
689        .get("tools")
690        .and_then(|value| value.as_array())
691        .cloned()
692        .unwrap_or_default();
693
694    tools
695        .into_iter()
696        .map(|tool| {
697            let raw: RawTool =
698                serde_json::from_value(tool).map_err(|err| KontextDevError::ConnectSession {
699                    message: format!("invalid tool payload: {err}"),
700                })?;
701
702            Ok(KontextTool {
703                id: raw.name.clone(),
704                name: raw.name,
705                description: raw.description,
706                input_schema: raw.input_schema,
707                server: None,
708            })
709        })
710        .collect()
711}
712
713pub(crate) fn parse_gateway_tools_payload(
714    raw: &Value,
715) -> Result<GatewayToolsPayload, KontextDevError> {
716    let json_text =
717        extract_json_resource_text(raw).ok_or_else(|| KontextDevError::ConnectSession {
718            message: "SEARCH_TOOLS did not return JSON resource content".to_string(),
719        })?;
720
721    let parsed = serde_json::from_str::<Value>(&json_text).map_err(|err| {
722        KontextDevError::ConnectSession {
723            message: format!("SEARCH_TOOLS returned invalid JSON: {err}"),
724        }
725    })?;
726
727    if let Some(items) = parsed.as_array() {
728        let tools = items
729            .iter()
730            .cloned()
731            .map(serde_json::from_value::<GatewayToolSummary>)
732            .collect::<Result<Vec<_>, _>>()
733            .map_err(|err| KontextDevError::ConnectSession {
734                message: format!("SEARCH_TOOLS returned invalid tool entry: {err}"),
735            })?
736            .into_iter()
737            .map(to_kontext_gateway_tool)
738            .collect();
739        return Ok(GatewayToolsPayload {
740            tools,
741            errors: Vec::new(),
742            elicitations: Vec::new(),
743        });
744    }
745
746    let Some(obj) = parsed.as_object() else {
747        return Err(KontextDevError::ConnectSession {
748            message: "SEARCH_TOOLS response was not a JSON array or object".to_string(),
749        });
750    };
751
752    let tools = obj
753        .get("items")
754        .and_then(Value::as_array)
755        .cloned()
756        .unwrap_or_default()
757        .into_iter()
758        .map(serde_json::from_value::<GatewayToolSummary>)
759        .collect::<Result<Vec<_>, _>>()
760        .map_err(|err| KontextDevError::ConnectSession {
761            message: format!("SEARCH_TOOLS items contained invalid tool data: {err}"),
762        })?
763        .into_iter()
764        .map(to_kontext_gateway_tool)
765        .collect::<Vec<_>>();
766
767    let errors = obj
768        .get("errors")
769        .and_then(Value::as_array)
770        .cloned()
771        .unwrap_or_default()
772        .into_iter()
773        .filter_map(|value| serde_json::from_value::<GatewayToolError>(value).ok())
774        .collect::<Vec<_>>();
775
776    let elicitations = obj
777        .get("elicitations")
778        .and_then(Value::as_array)
779        .cloned()
780        .unwrap_or_default()
781        .into_iter()
782        .filter_map(|value| serde_json::from_value::<GatewayElicitation>(value).ok())
783        .collect::<Vec<_>>();
784
785    Ok(GatewayToolsPayload {
786        tools,
787        errors,
788        elicitations,
789    })
790}
791
792fn to_kontext_gateway_tool(summary: GatewayToolSummary) -> KontextTool {
793    let server = summary.server.and_then(|server| {
794        server.id.map(|id| KontextToolServer {
795            id,
796            name: server.name,
797        })
798    });
799
800    KontextTool {
801        id: summary.id,
802        name: summary.name,
803        description: summary.description,
804        input_schema: summary.input_schema,
805        server,
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use super::*;
812
813    #[test]
814    fn parse_json_or_streamable_body_parses_json_payload() {
815        let parsed = parse_json_or_streamable_body(
816            r#"{"jsonrpc":"2.0","result":{"ok":true}}"#,
817            "application/json",
818        )
819        .expect("json should parse");
820        assert_eq!(parsed["result"]["ok"], Value::Bool(true));
821    }
822
823    #[test]
824    fn parse_json_or_streamable_body_parses_sse_payload() {
825        let parsed = parse_json_or_streamable_body(
826            "event: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{\"sessionId\":\"abc\"}}\n\n",
827            "text/event-stream",
828        )
829        .expect("sse should parse");
830        assert_eq!(
831            parsed["result"]["sessionId"],
832            Value::String("abc".to_string())
833        );
834    }
835
836    #[test]
837    fn parse_json_or_streamable_body_falls_back_to_sse_when_content_type_is_json() {
838        let parsed = parse_json_or_streamable_body(
839            "data: {\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[]}}\n\n",
840            "application/json",
841        )
842        .expect("sse fallback should parse");
843        assert_eq!(parsed["result"]["tools"], Value::Array(Vec::new()));
844    }
845
846    #[test]
847    fn raw_tool_parses_input_schema_from_camel_case_key() {
848        let parsed: RawTool = serde_json::from_value(serde_json::json!({
849            "name": "SEARCH_TOOLS",
850            "description": "Search available tools",
851            "inputSchema": { "type": "object", "properties": { "limit": { "type": "number" } } }
852        }))
853        .expect("raw tool should deserialize");
854
855        assert_eq!(parsed.name, "SEARCH_TOOLS");
856        assert_eq!(
857            parsed
858                .input_schema
859                .as_ref()
860                .and_then(|value| value.get("type"))
861                .and_then(Value::as_str),
862            Some("object")
863        );
864    }
865
866    #[test]
867    fn extract_json_resource_text_skips_resource_items_without_resource_payload() {
868        let payload = serde_json::json!({
869            "content": [
870                { "type": "resource" },
871                {
872                    "type": "resource",
873                    "resource": {
874                        "mimeType": "application/json",
875                        "text": "{\"ok\":true}"
876                    }
877                }
878            ]
879        });
880
881        assert_eq!(
882            extract_json_resource_text(&payload),
883            Some("{\"ok\":true}".to_string())
884        );
885    }
886}