Skip to main content

kontext_dev_sdk/
mcp.rs

1use std::sync::Arc;
2
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tokio::sync::Mutex;
6
7use crate::KontextAuthSession;
8use crate::KontextDevClient;
9use crate::KontextDevConfig;
10use crate::KontextDevError;
11
12pub const DEFAULT_SERVER: &str = "https://api.kontext.dev";
13const MCP_SESSION_HEADER: &str = "Mcp-Session-Id";
14const META_SEARCH_TOOLS: &str = "SEARCH_TOOLS";
15const META_EXECUTE_TOOL: &str = "EXECUTE_TOOL";
16const DEFAULT_MCP_PROTOCOL_VERSION: &str = "2025-06-18";
17const STREAMABLE_HTTP_ACCEPT: &str = "application/json, text/event-stream";
18const STREAM_CONTENT_TYPE: &str = "text/event-stream";
19
20pub fn normalize_kontext_server_url(server: &str) -> String {
21    let mut url = server.trim_end_matches('/').to_string();
22    if let Some(stripped) = url.strip_suffix("/api/v1") {
23        url = stripped.to_string();
24    }
25    if let Some(stripped) = url.strip_suffix("/mcp") {
26        url = stripped.to_string();
27    }
28    url.trim_end_matches('/').to_string()
29}
30
31#[derive(Clone, Debug)]
32pub struct KontextMcpConfig {
33    pub client_session_id: String,
34    pub client_id: String,
35    pub redirect_uri: String,
36    pub url: Option<String>,
37    pub server: Option<String>,
38    pub client_secret: Option<String>,
39    pub scope: Option<String>,
40    pub resource: Option<String>,
41    pub session_key: Option<String>,
42    pub integration_ui_url: Option<String>,
43    pub integration_return_to: Option<String>,
44    pub auth_timeout_seconds: Option<i64>,
45    pub open_connect_page_on_login: Option<bool>,
46    pub token_cache_path: Option<String>,
47}
48
49impl Default for KontextMcpConfig {
50    fn default() -> Self {
51        Self {
52            client_session_id: String::new(),
53            client_id: String::new(),
54            redirect_uri: "http://localhost:3333/callback".to_string(),
55            url: None,
56            server: Some(DEFAULT_SERVER.to_string()),
57            client_secret: None,
58            scope: None,
59            resource: None,
60            session_key: None,
61            integration_ui_url: None,
62            integration_return_to: None,
63            auth_timeout_seconds: None,
64            open_connect_page_on_login: None,
65            token_cache_path: None,
66        }
67    }
68}
69
70#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
71#[serde(rename_all = "snake_case")]
72pub enum RuntimeIntegrationCategory {
73    GatewayRemoteMcp,
74    InternalMcpCredentials,
75}
76
77#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
78#[serde(rename_all = "snake_case")]
79pub enum RuntimeIntegrationConnectType {
80    Oauth,
81    Credentials,
82    None,
83}
84
85#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
86#[serde(rename_all = "camelCase")]
87pub struct RuntimeIntegrationRecord {
88    pub id: String,
89    pub name: String,
90    pub url: String,
91    pub category: RuntimeIntegrationCategory,
92    pub connect_type: RuntimeIntegrationConnectType,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub auth_mode: Option<String>,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub credential_schema: Option<serde_json::Value>,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub requires_oauth: Option<bool>,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub connection: Option<RuntimeIntegrationConnection>,
101}
102
103#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
104#[serde(rename_all = "camelCase")]
105pub struct RuntimeIntegrationConnection {
106    pub connected: bool,
107    pub status: String,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub expires_at: Option<String>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub display_name: Option<String>,
112}
113
114#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
115#[serde(rename_all = "camelCase")]
116pub struct KontextTool {
117    pub id: String,
118    pub name: String,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub description: Option<String>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub input_schema: Option<serde_json::Value>,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub server: Option<KontextToolServer>,
125}
126
127#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
128#[serde(rename_all = "camelCase")]
129pub struct KontextToolServer {
130    pub id: String,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub name: Option<String>,
133}
134
135#[derive(Clone, Debug, Default)]
136struct McpSessionState {
137    session_id: Option<String>,
138    access_token: Option<String>,
139}
140
141#[derive(Clone, Debug)]
142pub struct KontextMcp {
143    config: KontextMcpConfig,
144    client: KontextDevClient,
145    http: reqwest::Client,
146    session: Arc<Mutex<McpSessionState>>,
147}
148
149impl KontextMcp {
150    pub fn new(config: KontextMcpConfig) -> Self {
151        let server =
152            normalize_kontext_server_url(config.server.as_deref().unwrap_or(DEFAULT_SERVER));
153        let sdk_config = KontextDevConfig {
154            server,
155            client_id: config.client_id.clone(),
156            client_secret: config.client_secret.clone(),
157            scope: config.scope.clone().unwrap_or_default(),
158            server_name: "kontext-dev".to_string(),
159            resource: config
160                .resource
161                .clone()
162                .unwrap_or_else(|| "mcp-gateway".to_string()),
163            integration_ui_url: config.integration_ui_url.clone(),
164            integration_return_to: config.integration_return_to.clone(),
165            open_connect_page_on_login: config.open_connect_page_on_login.unwrap_or(true),
166            auth_timeout_seconds: config.auth_timeout_seconds.unwrap_or(300),
167            token_cache_path: config.token_cache_path.clone(),
168            redirect_uri: config.redirect_uri.clone(),
169        };
170
171        Self {
172            config,
173            client: KontextDevClient::new(sdk_config),
174            http: reqwest::Client::new(),
175            session: Arc::new(Mutex::new(McpSessionState::default())),
176        }
177    }
178
179    pub fn client(&self) -> &KontextDevClient {
180        &self.client
181    }
182
183    pub async fn authenticate_mcp(&self) -> Result<KontextAuthSession, KontextDevError> {
184        self.client.authenticate_mcp().await
185    }
186
187    pub fn mcp_url(&self) -> Result<String, KontextDevError> {
188        if let Some(url) = &self.config.url {
189            return Ok(url.clone());
190        }
191        self.client.mcp_url()
192    }
193
194    pub async fn list_integrations(
195        &self,
196    ) -> Result<Vec<RuntimeIntegrationRecord>, KontextDevError> {
197        let session = self.authenticate_mcp().await?;
198        let base = self.client.server_base_url()?;
199        let response = self
200            .http
201            .get(format!("{}/mcp/integrations", base.trim_end_matches('/')))
202            .bearer_auth(session.gateway_token.access_token)
203            .send()
204            .await
205            .map_err(|err| KontextDevError::ConnectSession {
206                message: err.to_string(),
207            })?;
208
209        if !response.status().is_success() {
210            let status = response.status();
211            let body = response.text().await.unwrap_or_default();
212            return Err(KontextDevError::ConnectSession {
213                message: format!("{status}: {body}"),
214            });
215        }
216
217        #[derive(Deserialize)]
218        struct IntegrationsResponse {
219            items: Vec<RuntimeIntegrationRecord>,
220        }
221
222        let payload = response
223            .json::<IntegrationsResponse>()
224            .await
225            .map_err(|err| KontextDevError::ConnectSession {
226                message: err.to_string(),
227            })?;
228
229        Ok(payload.items)
230    }
231
232    pub async fn list_tools(&self) -> Result<Vec<KontextTool>, KontextDevError> {
233        let session = self.authenticate_mcp().await?;
234        self.list_tools_with_access_token(&session.gateway_token.access_token)
235            .await
236    }
237
238    pub async fn list_tools_with_access_token(
239        &self,
240        access_token: &str,
241    ) -> Result<Vec<KontextTool>, KontextDevError> {
242        let result = self
243            .json_rpc_with_session(
244                access_token,
245                "tools/list",
246                json!({}),
247                Some("list-tools"),
248                true,
249            )
250            .await?;
251        parse_tools_list_result(&result)
252    }
253
254    pub async fn call_tool(
255        &self,
256        tool_id: &str,
257        args: Option<serde_json::Map<String, serde_json::Value>>,
258    ) -> Result<serde_json::Value, KontextDevError> {
259        let session = self.authenticate_mcp().await?;
260        self.call_tool_with_access_token(&session.gateway_token.access_token, tool_id, args)
261            .await
262    }
263
264    pub async fn call_tool_with_access_token(
265        &self,
266        access_token: &str,
267        tool_id: &str,
268        args: Option<serde_json::Map<String, serde_json::Value>>,
269    ) -> Result<serde_json::Value, KontextDevError> {
270        self.json_rpc_with_session(
271            access_token,
272            "tools/call",
273            json!({ "name": tool_id, "arguments": args.unwrap_or_default() }),
274            Some("call-tool"),
275            true,
276        )
277        .await
278    }
279
280    async fn json_rpc_with_session(
281        &self,
282        access_token: &str,
283        method: &str,
284        params: Value,
285        id: Option<&str>,
286        allow_session_retry: bool,
287    ) -> Result<Value, KontextDevError> {
288        let max_attempts = if allow_session_retry { 2 } else { 1 };
289        for attempt in 0..max_attempts {
290            let session_id = self.ensure_mcp_session(access_token).await?;
291
292            let response = self
293                .http
294                .post(self.mcp_url()?)
295                .bearer_auth(access_token)
296                .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
297                .header(MCP_SESSION_HEADER, &session_id)
298                .json(&json!({
299                    "jsonrpc": "2.0",
300                    "id": id.unwrap_or("1"),
301                    "method": method,
302                    "params": params,
303                }))
304                .send()
305                .await
306                .map_err(|err| KontextDevError::ConnectSession {
307                    message: err.to_string(),
308                })?;
309
310            if !response.status().is_success() {
311                let status = response.status();
312                let body = response.text().await.unwrap_or_default();
313                let retryable = attempt + 1 < max_attempts && is_invalid_session_error(&body);
314                if retryable {
315                    self.invalidate_session().await;
316                    continue;
317                }
318                return Err(KontextDevError::ConnectSession {
319                    message: format!("{status}: {body}"),
320                });
321            }
322
323            let payload = parse_json_or_streamable_response(response).await?;
324
325            if let Some(error) = payload.get("error") {
326                let message = extract_jsonrpc_error_message(error);
327                let retryable =
328                    attempt + 1 < max_attempts && is_invalid_session_error(message.as_str());
329                if retryable {
330                    self.invalidate_session().await;
331                    continue;
332                }
333                return Err(KontextDevError::ConnectSession { message });
334            }
335
336            return Ok(payload.get("result").cloned().unwrap_or(Value::Null));
337        }
338
339        Err(KontextDevError::ConnectSession {
340            message: "MCP request failed after session retry".to_string(),
341        })
342    }
343
344    async fn ensure_mcp_session(&self, access_token: &str) -> Result<String, KontextDevError> {
345        {
346            let guard = self.session.lock().await;
347            if guard.access_token.as_deref() == Some(access_token)
348                && let Some(session_id) = guard.session_id.clone()
349            {
350                return Ok(session_id);
351            }
352        }
353
354        let initialize_response = self
355            .http
356            .post(self.mcp_url()?)
357            .bearer_auth(access_token)
358            .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
359            .json(&json!({
360                "jsonrpc": "2.0",
361                "id": "initialize",
362                "method": "initialize",
363                "params": {
364                    "protocolVersion": DEFAULT_MCP_PROTOCOL_VERSION,
365                    "capabilities": {
366                        "tools": {}
367                    },
368                    "clientInfo": {
369                        "name": "kontext-dev-sdk-rs",
370                        "version": env!("CARGO_PKG_VERSION"),
371                        "sessionId": self.config.client_session_id
372                    }
373                }
374            }))
375            .send()
376            .await
377            .map_err(|err| KontextDevError::ConnectSession {
378                message: err.to_string(),
379            })?;
380
381        if !initialize_response.status().is_success() {
382            let status = initialize_response.status();
383            let body = initialize_response.text().await.unwrap_or_default();
384            return Err(KontextDevError::ConnectSession {
385                message: format!("{status}: {body}"),
386            });
387        }
388
389        let session_header = initialize_response
390            .headers()
391            .get(MCP_SESSION_HEADER)
392            .or_else(|| initialize_response.headers().get("mcp-session-id"))
393            .and_then(|value| value.to_str().ok())
394            .map(|value| value.trim().to_string());
395
396        let initialize_payload = parse_json_or_streamable_response(initialize_response).await?;
397
398        if let Some(error) = initialize_payload.get("error") {
399            return Err(KontextDevError::ConnectSession {
400                message: extract_jsonrpc_error_message(error),
401            });
402        }
403
404        let session_id = session_header
405            .or_else(|| {
406                initialize_payload
407                    .get("result")
408                    .and_then(|result| result.get("sessionId"))
409                    .and_then(|value| value.as_str())
410                    .map(|value| value.to_string())
411            })
412            .or_else(|| {
413                initialize_payload
414                    .get("result")
415                    .and_then(|result| result.get("session_id"))
416                    .and_then(|value| value.as_str())
417                    .map(|value| value.to_string())
418            })
419            .ok_or_else(|| KontextDevError::ConnectSession {
420                message: "MCP initialize did not return a session id".to_string(),
421            })?;
422
423        // Best-effort initialized notification. Many servers accept requests
424        // without it, but send it to follow the Streamable HTTP MCP handshake.
425        let _ = self
426            .http
427            .post(self.mcp_url()?)
428            .bearer_auth(access_token)
429            .header(reqwest::header::ACCEPT, STREAMABLE_HTTP_ACCEPT)
430            .header(MCP_SESSION_HEADER, &session_id)
431            .json(&json!({
432                "jsonrpc": "2.0",
433                "method": "notifications/initialized",
434                "params": {}
435            }))
436            .send()
437            .await;
438
439        {
440            let mut guard = self.session.lock().await;
441            guard.session_id = Some(session_id.clone());
442            guard.access_token = Some(access_token.to_string());
443        }
444
445        Ok(session_id)
446    }
447
448    async fn invalidate_session(&self) {
449        let mut guard = self.session.lock().await;
450        guard.session_id = None;
451        guard.access_token = None;
452    }
453}
454
455fn extract_jsonrpc_error_message(error: &Value) -> String {
456    error
457        .get("message")
458        .and_then(|value| value.as_str())
459        .map(ToString::to_string)
460        .or_else(|| {
461            error
462                .get("error_description")
463                .and_then(|value| value.as_str())
464                .map(ToString::to_string)
465        })
466        .unwrap_or_else(|| error.to_string())
467}
468
469fn is_invalid_session_error(message: &str) -> bool {
470    let lower = message.to_ascii_lowercase();
471    lower.contains("no valid session id")
472        || lower.contains("no valid session-id")
473        || lower.contains("invalid session")
474}
475
476async fn parse_json_or_streamable_response(
477    response: reqwest::Response,
478) -> Result<Value, KontextDevError> {
479    let content_type = response
480        .headers()
481        .get(reqwest::header::CONTENT_TYPE)
482        .and_then(|value| value.to_str().ok())
483        .map(|value| value.to_ascii_lowercase())
484        .unwrap_or_default();
485    let body = response
486        .text()
487        .await
488        .map_err(|err| KontextDevError::ConnectSession {
489            message: err.to_string(),
490        })?;
491
492    parse_json_or_streamable_body(&body, &content_type)
493        .map_err(|message| KontextDevError::ConnectSession { message })
494}
495
496fn parse_json_or_streamable_body(body: &str, content_type: &str) -> Result<Value, String> {
497    let parse_json = || serde_json::from_str::<Value>(body).map_err(|err| err.to_string());
498    let parse_sse = || parse_sse_last_json_event(body);
499
500    if content_type.contains(STREAM_CONTENT_TYPE) {
501        return parse_sse().ok_or_else(|| {
502            "failed to parse streamable MCP response as SSE JSON events".to_string()
503        });
504    }
505
506    parse_json().or_else(|json_err| {
507        parse_sse().ok_or_else(|| format!("failed to decode response body: {json_err}"))
508    })
509}
510
511fn parse_sse_last_json_event(body: &str) -> Option<Value> {
512    let mut current_data = Vec::<String>::new();
513    let mut last_json = None;
514
515    let flush_data = |current_data: &mut Vec<String>, last_json: &mut Option<Value>| {
516        if current_data.is_empty() {
517            return;
518        }
519        let data = current_data.join("\n");
520        current_data.clear();
521        let trimmed = data.trim();
522        if trimmed.is_empty() || trimmed == "[DONE]" {
523            return;
524        }
525        if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
526            *last_json = Some(value);
527        }
528    };
529
530    for line in body.lines() {
531        let line = line.trim_end_matches('\r');
532        if line.is_empty() {
533            flush_data(&mut current_data, &mut last_json);
534            continue;
535        }
536        if let Some(data) = line.strip_prefix("data:") {
537            current_data.push(data.trim_start().to_string());
538            continue;
539        }
540        if let Ok(value) = serde_json::from_str::<Value>(line) {
541            last_json = Some(value);
542        }
543    }
544    flush_data(&mut current_data, &mut last_json);
545
546    last_json
547}
548
549pub(crate) fn has_meta_gateway_tools(tools: &[KontextTool]) -> bool {
550    let mut has_search = false;
551    let mut has_execute = false;
552    for tool in tools {
553        if tool.name == META_SEARCH_TOOLS {
554            has_search = true;
555        } else if tool.name == META_EXECUTE_TOOL {
556            has_execute = true;
557        }
558    }
559    has_search && has_execute
560}
561
562pub(crate) fn extract_json_resource_text(result: &Value) -> Option<String> {
563    let content = result.get("content")?.as_array()?;
564    for item in content {
565        if item.get("type").and_then(Value::as_str) != Some("resource") {
566            continue;
567        }
568        let Some(resource) = item.get("resource") else {
569            continue;
570        };
571        if resource.get("mimeType").and_then(Value::as_str) != Some("application/json") {
572            continue;
573        }
574        if let Some(text) = resource.get("text").and_then(Value::as_str) {
575            return Some(text.to_string());
576        }
577    }
578    None
579}
580
581pub(crate) fn extract_text_content(result: &Value) -> String {
582    let Some(content) = result.get("content").and_then(Value::as_array) else {
583        return result.to_string();
584    };
585
586    let mut text_items = Vec::new();
587    for item in content {
588        if item.get("type").and_then(Value::as_str) == Some("text")
589            && let Some(text) = item.get("text").and_then(Value::as_str)
590        {
591            text_items.push(text.to_string());
592        }
593    }
594    if !text_items.is_empty() {
595        return text_items.join("\n");
596    }
597
598    let mut resource_items = Vec::new();
599    for item in content {
600        if item.get("type").and_then(Value::as_str) != Some("resource") {
601            continue;
602        }
603        let Some(resource_text) = item
604            .get("resource")
605            .and_then(|resource| resource.get("text"))
606            .and_then(Value::as_str)
607        else {
608            continue;
609        };
610
611        let parsed = serde_json::from_str::<Value>(resource_text)
612            .ok()
613            .map(|value| extract_text_content(&value))
614            .unwrap_or_else(|| resource_text.to_string());
615        resource_items.push(parsed);
616    }
617
618    if !resource_items.is_empty() {
619        return resource_items.join("\n");
620    }
621
622    content
623        .iter()
624        .map(Value::to_string)
625        .collect::<Vec<_>>()
626        .join("\n")
627}
628
629#[derive(Clone, Debug)]
630pub(crate) struct GatewayToolsPayload {
631    pub tools: Vec<KontextTool>,
632    pub errors: Vec<GatewayToolError>,
633    pub elicitations: Vec<GatewayElicitation>,
634}
635
636#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
637#[serde(rename_all = "camelCase")]
638pub struct GatewayToolError {
639    pub server_id: String,
640    #[serde(default)]
641    pub server_name: Option<String>,
642    #[serde(default)]
643    pub reason: Option<String>,
644}
645
646#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
647#[serde(rename_all = "camelCase")]
648pub struct GatewayElicitation {
649    pub url: String,
650    #[serde(default)]
651    pub message: Option<String>,
652    #[serde(default)]
653    pub integration_id: Option<String>,
654    #[serde(default)]
655    pub integration_name: Option<String>,
656}
657
658#[derive(Clone, Debug, Deserialize)]
659#[serde(rename_all = "camelCase")]
660struct GatewayToolSummary {
661    id: String,
662    name: String,
663    #[serde(default)]
664    description: Option<String>,
665    #[serde(default)]
666    input_schema: Option<Value>,
667    #[serde(default)]
668    server: Option<GatewayToolServer>,
669}
670
671#[derive(Clone, Debug, Deserialize)]
672#[serde(rename_all = "camelCase")]
673struct GatewayToolServer {
674    #[serde(default)]
675    id: Option<String>,
676    #[serde(default)]
677    name: Option<String>,
678}
679
680#[derive(Debug, Deserialize)]
681#[serde(rename_all = "camelCase")]
682struct RawTool {
683    name: String,
684    #[serde(default)]
685    description: Option<String>,
686    #[serde(default)]
687    input_schema: Option<serde_json::Value>,
688}
689
690fn parse_tools_list_result(result: &Value) -> Result<Vec<KontextTool>, KontextDevError> {
691    let tools = result
692        .get("tools")
693        .and_then(|value| value.as_array())
694        .cloned()
695        .unwrap_or_default();
696
697    tools
698        .into_iter()
699        .map(|tool| {
700            let raw: RawTool =
701                serde_json::from_value(tool).map_err(|err| KontextDevError::ConnectSession {
702                    message: format!("invalid tool payload: {err}"),
703                })?;
704
705            Ok(KontextTool {
706                id: raw.name.clone(),
707                name: raw.name,
708                description: raw.description,
709                input_schema: raw.input_schema,
710                server: None,
711            })
712        })
713        .collect()
714}
715
716pub(crate) fn parse_gateway_tools_payload(
717    raw: &Value,
718) -> Result<GatewayToolsPayload, KontextDevError> {
719    let json_text =
720        extract_json_resource_text(raw).ok_or_else(|| KontextDevError::ConnectSession {
721            message: "SEARCH_TOOLS did not return JSON resource content".to_string(),
722        })?;
723
724    let parsed = serde_json::from_str::<Value>(&json_text).map_err(|err| {
725        KontextDevError::ConnectSession {
726            message: format!("SEARCH_TOOLS returned invalid JSON: {err}"),
727        }
728    })?;
729
730    if let Some(items) = parsed.as_array() {
731        let tools = items
732            .iter()
733            .cloned()
734            .map(serde_json::from_value::<GatewayToolSummary>)
735            .collect::<Result<Vec<_>, _>>()
736            .map_err(|err| KontextDevError::ConnectSession {
737                message: format!("SEARCH_TOOLS returned invalid tool entry: {err}"),
738            })?
739            .into_iter()
740            .map(to_kontext_gateway_tool)
741            .collect();
742        return Ok(GatewayToolsPayload {
743            tools,
744            errors: Vec::new(),
745            elicitations: Vec::new(),
746        });
747    }
748
749    let Some(obj) = parsed.as_object() else {
750        return Err(KontextDevError::ConnectSession {
751            message: "SEARCH_TOOLS response was not a JSON array or object".to_string(),
752        });
753    };
754
755    let tools = obj
756        .get("items")
757        .and_then(Value::as_array)
758        .cloned()
759        .unwrap_or_default()
760        .into_iter()
761        .map(serde_json::from_value::<GatewayToolSummary>)
762        .collect::<Result<Vec<_>, _>>()
763        .map_err(|err| KontextDevError::ConnectSession {
764            message: format!("SEARCH_TOOLS items contained invalid tool data: {err}"),
765        })?
766        .into_iter()
767        .map(to_kontext_gateway_tool)
768        .collect::<Vec<_>>();
769
770    let errors = obj
771        .get("errors")
772        .and_then(Value::as_array)
773        .cloned()
774        .unwrap_or_default()
775        .into_iter()
776        .filter_map(|value| serde_json::from_value::<GatewayToolError>(value).ok())
777        .collect::<Vec<_>>();
778
779    let elicitations = obj
780        .get("elicitations")
781        .and_then(Value::as_array)
782        .cloned()
783        .unwrap_or_default()
784        .into_iter()
785        .filter_map(|value| serde_json::from_value::<GatewayElicitation>(value).ok())
786        .collect::<Vec<_>>();
787
788    Ok(GatewayToolsPayload {
789        tools,
790        errors,
791        elicitations,
792    })
793}
794
795fn to_kontext_gateway_tool(summary: GatewayToolSummary) -> KontextTool {
796    let server = summary.server.and_then(|server| {
797        server.id.map(|id| KontextToolServer {
798            id,
799            name: server.name,
800        })
801    });
802
803    KontextTool {
804        id: summary.id,
805        name: summary.name,
806        description: summary.description,
807        input_schema: summary.input_schema,
808        server,
809    }
810}
811
812#[cfg(test)]
813mod tests {
814    use super::*;
815
816    #[test]
817    fn parse_json_or_streamable_body_parses_json_payload() {
818        let parsed = parse_json_or_streamable_body(
819            r#"{"jsonrpc":"2.0","result":{"ok":true}}"#,
820            "application/json",
821        )
822        .expect("json should parse");
823        assert_eq!(parsed["result"]["ok"], Value::Bool(true));
824    }
825
826    #[test]
827    fn parse_json_or_streamable_body_parses_sse_payload() {
828        let parsed = parse_json_or_streamable_body(
829            "event: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{\"sessionId\":\"abc\"}}\n\n",
830            "text/event-stream",
831        )
832        .expect("sse should parse");
833        assert_eq!(
834            parsed["result"]["sessionId"],
835            Value::String("abc".to_string())
836        );
837    }
838
839    #[test]
840    fn parse_json_or_streamable_body_falls_back_to_sse_when_content_type_is_json() {
841        let parsed = parse_json_or_streamable_body(
842            "data: {\"jsonrpc\":\"2.0\",\"result\":{\"tools\":[]}}\n\n",
843            "application/json",
844        )
845        .expect("sse fallback should parse");
846        assert_eq!(parsed["result"]["tools"], Value::Array(Vec::new()));
847    }
848
849    #[test]
850    fn raw_tool_parses_input_schema_from_camel_case_key() {
851        let parsed: RawTool = serde_json::from_value(serde_json::json!({
852            "name": "SEARCH_TOOLS",
853            "description": "Search available tools",
854            "inputSchema": { "type": "object", "properties": { "limit": { "type": "number" } } }
855        }))
856        .expect("raw tool should deserialize");
857
858        assert_eq!(parsed.name, "SEARCH_TOOLS");
859        assert_eq!(
860            parsed
861                .input_schema
862                .as_ref()
863                .and_then(|value| value.get("type"))
864                .and_then(Value::as_str),
865            Some("object")
866        );
867    }
868
869    #[test]
870    fn extract_json_resource_text_skips_resource_items_without_resource_payload() {
871        let payload = serde_json::json!({
872            "content": [
873                { "type": "resource" },
874                {
875                    "type": "resource",
876                    "resource": {
877                        "mimeType": "application/json",
878                        "text": "{\"ok\":true}"
879                    }
880                }
881            ]
882        });
883
884        assert_eq!(
885            extract_json_resource_text(&payload),
886            Some("{\"ok\":true}".to_string())
887        );
888    }
889}