1use std::{
2 collections::{BTreeMap, HashMap},
3 env,
4 ffi::OsString,
5 io,
6 path::{Path, PathBuf},
7 time::Duration,
8};
9
10use serde::{Deserialize, Serialize};
11use thiserror::Error;
12use tokio::{
13 process::{Child, ChildStderr, ChildStdin, ChildStdout, Command},
14 time,
15};
16
17use super::{
18 McpConfigError, McpConfigManager, McpServerDefinition, McpServerEntry, McpToolConfig,
19 McpTransport, StdioServerConfig, StdioServerDefinition, StreamableHttpDefinition,
20};
21
22#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
24pub struct McpRuntimeServer {
25 pub name: String,
26 pub transport: McpRuntimeTransport,
27 #[serde(default, skip_serializing_if = "Option::is_none")]
28 pub description: Option<String>,
29 #[serde(default, skip_serializing_if = "Vec::is_empty")]
30 pub tags: Vec<String>,
31 #[serde(default, skip_serializing_if = "Option::is_none")]
32 pub tools: Option<McpToolConfig>,
33}
34
35#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(tag = "transport", rename_all = "snake_case")]
38pub enum McpRuntimeTransport {
39 Stdio(StdioServerDefinition),
40 StreamableHttp(ResolvedStreamableHttpDefinition),
41}
42
43#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
45pub struct ResolvedStreamableHttpDefinition {
46 pub url: String,
47 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
48 pub headers: BTreeMap<String, String>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub bearer_env_var: Option<String>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub bearer_token: Option<String>,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub connect_timeout_ms: Option<u64>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub request_timeout_ms: Option<u64>,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
61pub struct McpServerLauncher {
62 pub name: String,
63 pub transport: McpServerLauncherTransport,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
65 pub description: Option<String>,
66 #[serde(default, skip_serializing_if = "Vec::is_empty")]
67 pub tags: Vec<String>,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
69 pub tools: Option<McpToolConfig>,
70}
71
72#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
74pub enum McpServerLauncherTransport {
75 Stdio(StdioLauncher),
76 StreamableHttp(StreamableHttpConnector),
77}
78
79#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
81pub struct StdioLauncher {
82 pub command: PathBuf,
83 pub args: Vec<String>,
84 pub env: Vec<(OsString, OsString)>,
85 pub current_dir: Option<PathBuf>,
86 pub timeout: Duration,
87 pub mirror_stdio: bool,
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
92pub struct StreamableHttpConnector {
93 pub url: String,
94 pub headers: BTreeMap<String, String>,
95 pub bearer_env_var: Option<String>,
96 pub bearer_token: Option<String>,
97 pub connect_timeout: Option<Duration>,
98 pub request_timeout: Option<Duration>,
99}
100
101impl From<McpServerEntry> for McpRuntimeServer {
102 fn from(entry: McpServerEntry) -> Self {
103 let McpServerEntry { name, definition } = entry;
104 McpRuntimeServer::from_definition(name, definition)
105 }
106}
107
108impl McpRuntimeServer {
109 pub fn from_definition(name: impl Into<String>, definition: McpServerDefinition) -> Self {
111 let McpServerDefinition {
112 transport,
113 description,
114 tags,
115 tools,
116 } = definition;
117
118 Self {
119 name: name.into(),
120 transport: McpRuntimeTransport::from_transport(transport),
121 description,
122 tags,
123 tools,
124 }
125 }
126
127 pub fn into_launcher(self, defaults: &StdioServerConfig) -> McpServerLauncher {
129 let McpRuntimeServer {
130 name,
131 transport,
132 description,
133 tags,
134 tools,
135 } = self;
136
137 let transport = match transport {
138 McpRuntimeTransport::Stdio(def) => {
139 McpServerLauncherTransport::Stdio(StdioLauncher::from_runtime(def, defaults))
140 }
141 McpRuntimeTransport::StreamableHttp(def) => {
142 McpServerLauncherTransport::StreamableHttp(def.into())
143 }
144 };
145
146 McpServerLauncher {
147 name,
148 transport,
149 description,
150 tags,
151 tools,
152 }
153 }
154
155 pub fn to_launcher(&self, defaults: &StdioServerConfig) -> McpServerLauncher {
157 self.clone().into_launcher(defaults)
158 }
159}
160
161impl McpRuntimeTransport {
162 fn from_transport(transport: McpTransport) -> Self {
163 match transport {
164 McpTransport::Stdio(definition) => McpRuntimeTransport::Stdio(definition),
165 McpTransport::StreamableHttp(definition) => {
166 McpRuntimeTransport::StreamableHttp(resolve_streamable_http(definition))
167 }
168 }
169 }
170}
171
172fn resolve_streamable_http(
173 definition: StreamableHttpDefinition,
174) -> ResolvedStreamableHttpDefinition {
175 let StreamableHttpDefinition {
176 url,
177 headers,
178 bearer_env_var,
179 connect_timeout_ms,
180 request_timeout_ms,
181 } = definition;
182
183 let mut headers = headers;
184 let mut bearer_token = None;
185 if let Some(env_var) = bearer_env_var.as_deref() {
186 if let Ok(token) = env::var(env_var) {
187 if !token.is_empty() {
188 let has_auth_header = headers
189 .keys()
190 .any(|key| key.eq_ignore_ascii_case("authorization"));
191 if !has_auth_header {
192 headers.insert("Authorization".into(), format!("Bearer {token}"));
193 }
194 bearer_token = Some(token);
195 }
196 }
197 }
198
199 ResolvedStreamableHttpDefinition {
200 url,
201 headers,
202 bearer_env_var,
203 bearer_token,
204 connect_timeout_ms,
205 request_timeout_ms,
206 }
207}
208
209impl StdioLauncher {
210 fn from_runtime(definition: StdioServerDefinition, defaults: &StdioServerConfig) -> Self {
211 let env = merge_stdio_env(
212 defaults.code_home.as_deref(),
213 &defaults.env,
214 &definition.env,
215 );
216
217 Self {
218 command: PathBuf::from(definition.command),
219 args: definition.args,
220 env,
221 current_dir: defaults.current_dir.clone(),
222 timeout: definition
223 .timeout_ms
224 .map(Duration::from_millis)
225 .unwrap_or(defaults.startup_timeout),
226 mirror_stdio: defaults.mirror_stdio,
227 }
228 }
229
230 pub fn command(&self) -> Command {
232 let mut command = Command::new(&self.command);
233 command
234 .args(&self.args)
235 .stdin(std::process::Stdio::piped())
236 .stdout(std::process::Stdio::piped())
237 .stderr(std::process::Stdio::piped())
238 .kill_on_drop(true);
239
240 if let Some(dir) = &self.current_dir {
241 command.current_dir(dir);
242 }
243
244 for (key, value) in &self.env {
245 command.env(key, value);
246 }
247
248 command
249 }
250}
251
252impl From<ResolvedStreamableHttpDefinition> for StreamableHttpConnector {
253 fn from(definition: ResolvedStreamableHttpDefinition) -> Self {
254 let ResolvedStreamableHttpDefinition {
255 url,
256 headers,
257 bearer_env_var,
258 bearer_token,
259 connect_timeout_ms,
260 request_timeout_ms,
261 } = definition;
262
263 Self {
264 url,
265 headers,
266 bearer_env_var,
267 bearer_token,
268 connect_timeout: connect_timeout_ms.map(Duration::from_millis),
269 request_timeout: request_timeout_ms.map(Duration::from_millis),
270 }
271 }
272}
273
274pub(crate) fn merge_stdio_env(
275 code_home: Option<&Path>,
276 base_env: &[(OsString, OsString)],
277 runtime_env: &BTreeMap<String, String>,
278) -> Vec<(OsString, OsString)> {
279 let mut merged: HashMap<OsString, OsString> = HashMap::new();
280
281 if let Some(code_home) = code_home {
282 merged.insert(
283 OsString::from("CODEX_HOME"),
284 code_home.as_os_str().to_os_string(),
285 );
286 }
287
288 for (key, value) in base_env {
289 merged.insert(key.clone(), value.clone());
290 }
291
292 for (key, value) in runtime_env {
293 merged.insert(OsString::from(key), OsString::from(value));
294 }
295
296 merged.into_iter().collect()
297}
298
299#[derive(Clone, Debug, PartialEq, Eq)]
301pub struct McpRuntimeSummary {
302 pub name: String,
303 pub description: Option<String>,
304 pub tags: Vec<String>,
305 pub tools: Option<McpToolConfig>,
306 pub transport: McpRuntimeSummaryTransport,
307}
308
309#[derive(Clone, Debug, PartialEq, Eq)]
311pub enum McpRuntimeSummaryTransport {
312 Stdio,
313 StreamableHttp,
314}
315
316impl From<&McpServerLauncher> for McpRuntimeSummary {
317 fn from(launcher: &McpServerLauncher) -> Self {
318 let transport = match launcher.transport {
319 McpServerLauncherTransport::Stdio(_) => McpRuntimeSummaryTransport::Stdio,
320 McpServerLauncherTransport::StreamableHttp(_) => {
321 McpRuntimeSummaryTransport::StreamableHttp
322 }
323 };
324
325 Self {
326 name: launcher.name.clone(),
327 description: launcher.description.clone(),
328 tags: launcher.tags.clone(),
329 tools: launcher.tools.clone(),
330 transport,
331 }
332 }
333}
334
335#[derive(Debug, Error)]
337pub enum McpRuntimeError {
338 #[error("runtime `{0}` not found")]
339 NotFound(String),
340 #[error("runtime `{name}` uses `{actual}` transport (expected {expected})")]
341 UnsupportedTransport {
342 name: String,
343 expected: &'static str,
344 actual: &'static str,
345 },
346 #[error("failed to spawn `{command:?}`: {source}")]
347 Spawn {
348 command: PathBuf,
349 #[source]
350 source: io::Error,
351 },
352 #[error("stdio pipes unavailable for `{name}`")]
353 MissingPipes { name: String },
354 #[error("failed to stop `{name}`: {source}")]
355 Stop {
356 name: String,
357 #[source]
358 source: io::Error,
359 },
360 #[error("timed out stopping `{name}` after {timeout:?}")]
361 StopTimeout { name: String, timeout: Duration },
362}
363
364#[derive(Clone, Debug)]
370pub struct McpRuntimeManager {
371 launchers: BTreeMap<String, McpServerLauncher>,
372}
373
374impl McpRuntimeManager {
375 pub fn new(launchers: Vec<McpServerLauncher>) -> Self {
377 let mut map = BTreeMap::new();
378 for launcher in launchers {
379 map.insert(launcher.name.clone(), launcher);
380 }
381 Self { launchers: map }
382 }
383
384 pub fn available(&self) -> Vec<McpRuntimeSummary> {
386 self.launchers
387 .values()
388 .map(McpRuntimeSummary::from)
389 .collect()
390 }
391
392 pub fn launcher(&self, name: &str) -> Option<McpServerLauncher> {
394 self.launchers.get(name).cloned()
395 }
396
397 pub fn prepare(&self, name: &str) -> Result<McpRuntimeHandle, McpRuntimeError> {
399 let Some(launcher) = self.launcher(name) else {
400 return Err(McpRuntimeError::NotFound(name.to_string()));
401 };
402
403 let tools = launcher.tools.clone();
404 match launcher.transport {
405 McpServerLauncherTransport::Stdio(launch) => {
406 let mut command = launch.command();
407 let spawn_target = launch.command.clone();
408 let mut child = command.spawn().map_err(|source| McpRuntimeError::Spawn {
409 command: spawn_target,
410 source,
411 })?;
412
413 let stdout = child.stdout.take();
414 let stdin = child.stdin.take();
415 if let (Some(stdout), Some(stdin)) = (stdout, stdin) {
416 let stderr = child.stderr.take();
417 Ok(McpRuntimeHandle::Stdio(ManagedStdioRuntime {
418 name: launcher.name,
419 tools,
420 child,
421 stdin,
422 stdout,
423 stderr,
424 timeout: launch.timeout,
425 }))
426 } else {
427 let _ = child.start_kill();
428 Err(McpRuntimeError::MissingPipes {
429 name: launcher.name,
430 })
431 }
432 }
433 McpServerLauncherTransport::StreamableHttp(connector) => {
434 Ok(McpRuntimeHandle::StreamableHttp(ManagedHttpRuntime {
435 name: launcher.name,
436 connector,
437 tools,
438 }))
439 }
440 }
441 }
442}
443
444#[derive(Clone, Debug)]
446pub struct McpRuntimeApi {
447 manager: McpRuntimeManager,
448}
449
450impl McpRuntimeApi {
451 pub fn new(launchers: Vec<McpServerLauncher>) -> Self {
453 Self {
454 manager: McpRuntimeManager::new(launchers),
455 }
456 }
457
458 pub fn from_config(
462 config: &McpConfigManager,
463 defaults: &StdioServerConfig,
464 ) -> Result<Self, McpConfigError> {
465 let launchers = config.runtime_launchers(defaults)?;
466 Ok(Self::new(launchers))
467 }
468
469 pub fn available(&self) -> Vec<McpRuntimeSummary> {
471 self.manager.available()
472 }
473
474 pub fn launcher(&self, name: &str) -> Result<McpServerLauncher, McpRuntimeError> {
476 self.manager
477 .launcher(name)
478 .ok_or_else(|| McpRuntimeError::NotFound(name.to_string()))
479 }
480
481 pub fn stdio_launcher(&self, name: &str) -> Result<StdioLauncher, McpRuntimeError> {
483 let launcher = self.launcher(name)?;
484 match launcher.transport {
485 McpServerLauncherTransport::Stdio(launch) => Ok(launch),
486 McpServerLauncherTransport::StreamableHttp(_) => {
487 Err(McpRuntimeError::UnsupportedTransport {
488 name: launcher.name,
489 expected: "stdio",
490 actual: "streamable_http",
491 })
492 }
493 }
494 }
495
496 pub fn http_connector(&self, name: &str) -> Result<StreamableHttpConnector, McpRuntimeError> {
498 let launcher = self.launcher(name)?;
499 match launcher.transport {
500 McpServerLauncherTransport::StreamableHttp(connector) => Ok(connector),
501 McpServerLauncherTransport::Stdio(_) => Err(McpRuntimeError::UnsupportedTransport {
502 name: launcher.name,
503 expected: "streamable_http",
504 actual: "stdio",
505 }),
506 }
507 }
508
509 pub fn prepare(&self, name: &str) -> Result<McpRuntimeHandle, McpRuntimeError> {
511 self.manager.prepare(name)
512 }
513}
514
515#[derive(Debug)]
517pub enum McpRuntimeHandle {
518 Stdio(ManagedStdioRuntime),
519 StreamableHttp(ManagedHttpRuntime),
520}
521
522impl McpRuntimeHandle {
523 pub fn tools(&self) -> Option<&McpToolConfig> {
525 match self {
526 McpRuntimeHandle::Stdio(handle) => handle.tools.as_ref(),
527 McpRuntimeHandle::StreamableHttp(handle) => handle.tools.as_ref(),
528 }
529 }
530}
531
532#[derive(Debug)]
534pub struct ManagedStdioRuntime {
535 name: String,
536 tools: Option<McpToolConfig>,
537 child: Child,
538 stdin: ChildStdin,
539 stdout: ChildStdout,
540 stderr: Option<ChildStderr>,
541 timeout: Duration,
542}
543
544impl ManagedStdioRuntime {
545 pub fn name(&self) -> &str {
547 &self.name
548 }
549
550 pub fn tools(&self) -> Option<&McpToolConfig> {
552 self.tools.as_ref()
553 }
554
555 pub fn stdin_mut(&mut self) -> &mut ChildStdin {
557 &mut self.stdin
558 }
559
560 pub fn stdout_mut(&mut self) -> &mut ChildStdout {
562 &mut self.stdout
563 }
564
565 pub fn stderr_mut(&mut self) -> Option<&mut ChildStderr> {
567 self.stderr.as_mut()
568 }
569
570 pub async fn stop(&mut self) -> Result<(), McpRuntimeError> {
572 if let Ok(Some(_)) = self.child.try_wait() {
573 return Ok(());
574 }
575
576 let _ = self.child.start_kill();
577 match time::timeout(self.timeout, self.child.wait()).await {
578 Ok(Ok(_)) => Ok(()),
579 Ok(Err(source)) => Err(McpRuntimeError::Stop {
580 name: self.name.clone(),
581 source,
582 }),
583 Err(_) => Err(McpRuntimeError::StopTimeout {
584 name: self.name.clone(),
585 timeout: self.timeout,
586 }),
587 }
588 }
589}
590
591impl Drop for ManagedStdioRuntime {
592 fn drop(&mut self) {
593 let _ = self.child.start_kill();
594 }
595}
596
597#[derive(Clone, Debug)]
599pub struct ManagedHttpRuntime {
600 pub name: String,
601 pub connector: StreamableHttpConnector,
602 pub tools: Option<McpToolConfig>,
603}