1use super::module_boundary::{module_env_with_extra, module_uses_mcp, probe_module_mcp_tools};
4use super::*;
5
6pub fn run_module_boundary_once(
7 module: &ModuleConfig,
8 pre_spawn: Option<&PreSpawnData>,
9 timeout: Duration,
10) -> Result<EventEnvelope<UnifiedEvent>, RuntimeBoundaryError> {
11 run_module_boundary_with_env(module, pre_spawn, &[], timeout)
12}
13
14pub(super) fn run_module_boundary_with_env(
15 module: &ModuleConfig,
16 pre_spawn: Option<&PreSpawnData>,
17 extra_env: &[(String, String)],
18 timeout: Duration,
19) -> Result<EventEnvelope<UnifiedEvent>, RuntimeBoundaryError> {
20 let env = module_env_with_extra(module, pre_spawn, extra_env);
21 let line = run_process_json_line(&module.command, &module.args, &env, timeout)
22 .map_err(RuntimeBoundaryError::Process)?;
23 normalize_event_line(&line).map_err(RuntimeBoundaryError::Normalize)
24}
25
26pub fn run_discovered_module_once(
27 config: &MobKitConfig,
28 module_id: &str,
29 timeout: Duration,
30) -> Result<EventEnvelope<UnifiedEvent>, RuntimeFromConfigError> {
31 let module = config
32 .modules
33 .iter()
34 .find(|module| module.id == module_id)
35 .ok_or_else(|| {
36 RuntimeFromConfigError::Config(ConfigResolutionError::ModuleNotConfigured(
37 module_id.to_string(),
38 ))
39 })?;
40
41 if !config.discovery.modules.iter().any(|id| id == module_id) {
42 return Err(RuntimeFromConfigError::Config(
43 ConfigResolutionError::ModuleNotDiscovered(module_id.to_string()),
44 ));
45 }
46
47 let pre_spawn = config
48 .pre_spawn
49 .iter()
50 .find(|data| data.module_id == module_id);
51 run_module_boundary_once(module, pre_spawn, timeout).map_err(RuntimeFromConfigError::Runtime)
52}
53
54impl MobkitRuntimeHandle {
55 pub(super) fn is_module_loaded(&self, module_id: &str) -> bool {
56 self.loaded_modules.contains(module_id)
57 }
58 pub fn shutdown(&mut self) -> RuntimeShutdownReport {
59 let mut seq = self
60 .lifecycle_events
61 .last()
62 .map_or(0, |event| event.seq + 1);
63 self.lifecycle_events.push(LifecycleEvent {
64 seq,
65 stage: LifecycleStage::ShutdownRequested,
66 });
67 seq += 1;
68 self.lifecycle_events.push(LifecycleEvent {
69 seq,
70 stage: LifecycleStage::ShutdownComplete,
71 });
72 self.running = false;
73
74 let terminated_modules: Vec<String> = self.loaded_modules.iter().cloned().collect();
75 self.loaded_modules.clear();
76
77 let mut orphan_processes = 0_u32;
78 let children = std::mem::take(&mut self.live_children);
79 for (_, mut child) in children {
80 if terminate_child(&mut child, false).is_err() {
81 orphan_processes += 1;
82 }
83 }
84
85 RuntimeShutdownReport {
86 terminated_modules,
87 orphan_processes,
88 }
89 }
90
91 pub fn is_running(&self) -> bool {
92 self.running
93 }
94
95 pub fn loaded_modules(&self) -> Vec<String> {
96 self.loaded_modules.iter().cloned().collect()
97 }
98
99 pub(super) fn module_and_prespawn(
100 &self,
101 module_id: &str,
102 ) -> Option<(&ModuleConfig, Option<&PreSpawnData>)> {
103 let module = self
104 .config
105 .modules
106 .iter()
107 .find(|module| module.id == module_id)?;
108 let pre_spawn = self
109 .config
110 .pre_spawn
111 .iter()
112 .find(|data| data.module_id == module_id);
113 Some((module, pre_spawn))
114 }
115
116 pub fn reconcile_modules(
117 &mut self,
118 modules: Vec<String>,
119 timeout: Duration,
120 ) -> Result<usize, RuntimeMutationError> {
121 for module_id in &modules {
122 if self
123 .config
124 .modules
125 .iter()
126 .all(|configured| configured.id != *module_id)
127 {
128 return Err(RuntimeMutationError::Config(
129 ConfigResolutionError::ModuleNotConfigured(module_id.clone()),
130 ));
131 }
132 }
133
134 self.config.discovery.modules = modules.clone();
135 let mut added = 0_usize;
136 for module_id in modules {
137 if self.loaded_modules.contains(&module_id) {
138 continue;
139 }
140 self.spawn_member(&module_id, timeout)?;
141 added += 1;
142 }
143 Ok(added)
144 }
145
146 pub fn spawn_member(
147 &mut self,
148 module_id: &str,
149 timeout: Duration,
150 ) -> Result<(), RuntimeMutationError> {
151 let module = self
152 .config
153 .modules
154 .iter()
155 .find(|module| module.id == module_id)
156 .ok_or_else(|| {
157 RuntimeMutationError::Config(ConfigResolutionError::ModuleNotConfigured(
158 module_id.to_string(),
159 ))
160 })?;
161
162 let pre_spawn = self
163 .config
164 .pre_spawn
165 .iter()
166 .find(|data| data.module_id == module_id);
167
168 let mut result = supervise_module_start(module, pre_spawn, timeout, &self.runtime_options);
169 self.supervisor_report
170 .transitions
171 .append(&mut result.transitions);
172
173 if let Some(error) = result.terminal_error.clone() {
174 insert_event_sorted(
175 &mut self.merged_events,
176 supervisor_warning_event(module_id, &error),
177 );
178 }
179
180 let Some(event) = result.event else {
181 return Err(RuntimeMutationError::Runtime(
182 result
183 .terminal_error
184 .unwrap_or(RuntimeBoundaryError::Process(
185 ProcessBoundaryError::EmptyOutput,
186 )),
187 ));
188 };
189 let module_is_mcp = module_uses_mcp(module, pre_spawn);
190
191 if !self
192 .config
193 .discovery
194 .modules
195 .iter()
196 .any(|configured| configured == module_id)
197 {
198 self.config.discovery.modules.push(module_id.to_string());
199 }
200
201 if !module_is_mcp {
202 let Some(mut child) = result.child else {
203 return Err(RuntimeMutationError::Runtime(
204 result
205 .terminal_error
206 .unwrap_or(RuntimeBoundaryError::Process(
207 ProcessBoundaryError::EmptyOutput,
208 )),
209 ));
210 };
211 if let Some(mut existing_child) = self.live_children.remove(module_id)
212 && let Err(err) = terminate_child(
213 &mut existing_child,
214 self.runtime_options.supervisor_test_force_terminate_failure,
215 )
216 {
217 self.live_children
218 .insert(module_id.to_string(), existing_child);
219
220 let mut error_message =
221 format!("failed to terminate existing child before respawn: {err}");
222 if let Err(replacement_err) = terminate_child(&mut child, false) {
223 error_message.push_str(&format!(
224 "; failed to terminate replacement child after aborted respawn: {replacement_err}"
225 ));
226 }
227 let runtime_error =
228 RuntimeBoundaryError::Process(ProcessBoundaryError::Io(error_message));
229 insert_event_sorted(
230 &mut self.merged_events,
231 supervisor_warning_event(module_id, &runtime_error),
232 );
233 return Err(RuntimeMutationError::Runtime(runtime_error));
234 }
235 self.loaded_modules.insert(module_id.to_string());
236 self.live_children.insert(module_id.to_string(), child);
237 insert_event_sorted(&mut self.merged_events, event);
238 return Ok(());
239 }
240
241 if let Some(mut existing_child) = self.live_children.remove(module_id)
242 && let Err(err) = terminate_child(
243 &mut existing_child,
244 self.runtime_options.supervisor_test_force_terminate_failure,
245 )
246 {
247 self.live_children
248 .insert(module_id.to_string(), existing_child);
249 let runtime_error = RuntimeBoundaryError::Process(ProcessBoundaryError::Io(format!(
250 "failed to terminate existing child before MCP respawn: {err}"
251 )));
252 insert_event_sorted(
253 &mut self.merged_events,
254 supervisor_warning_event(module_id, &runtime_error),
255 );
256 return Err(RuntimeMutationError::Runtime(runtime_error));
257 }
258
259 self.loaded_modules.insert(module_id.to_string());
260 insert_event_sorted(&mut self.merged_events, event);
261 Ok(())
262 }
263}
264
265pub(super) struct SuperviseModuleStartResult {
266 pub event: Option<EventEnvelope<UnifiedEvent>>,
267 pub child: Option<Child>,
268 pub transitions: Vec<ModuleHealthTransition>,
269 pub terminal_error: Option<RuntimeBoundaryError>,
270}
271
272pub(super) fn supervise_module_start(
273 module: &ModuleConfig,
274 pre_spawn: Option<&PreSpawnData>,
275 timeout: Duration,
276 options: &RuntimeOptions,
277) -> SuperviseModuleStartResult {
278 if module_uses_mcp(module, pre_spawn) {
279 return supervise_mcp_module_start(module, pre_spawn, timeout);
280 }
281
282 let mut transitions = vec![ModuleHealthTransition {
283 module_id: module.id.clone(),
284 from: None,
285 to: ModuleHealthState::Starting,
286 attempt: 0,
287 }];
288
289 let mut attempts = 0_u32;
290 let mut state = ModuleHealthState::Starting;
291
292 loop {
293 attempts += 1;
294 let result = spawn_module_capture_first_event(
295 module,
296 pre_spawn,
297 timeout,
298 options.supervisor_test_force_terminate_failure,
299 );
300
301 match result {
302 Ok((event, mut child)) => {
303 transitions.push(ModuleHealthTransition {
304 module_id: module.id.clone(),
305 from: Some(state.clone()),
306 to: ModuleHealthState::Healthy,
307 attempt: attempts,
308 });
309
310 let should_restart = match module.restart_policy {
311 RestartPolicy::Always => attempts <= options.always_restart_budget,
312 _ => false,
313 };
314
315 if should_restart {
316 transitions.push(ModuleHealthTransition {
317 module_id: module.id.clone(),
318 from: Some(ModuleHealthState::Healthy),
319 to: ModuleHealthState::Restarting,
320 attempt: attempts,
321 });
322 if let Err(err) =
323 terminate_child(&mut child, options.supervisor_test_force_terminate_failure)
324 {
325 transitions.push(ModuleHealthTransition {
326 module_id: module.id.clone(),
327 from: Some(ModuleHealthState::Restarting),
328 to: ModuleHealthState::Failed,
329 attempt: attempts,
330 });
331 transitions.push(ModuleHealthTransition {
332 module_id: module.id.clone(),
333 from: Some(ModuleHealthState::Failed),
334 to: ModuleHealthState::Healthy,
335 attempt: attempts,
336 });
337 return SuperviseModuleStartResult {
338 event: Some(event),
339 child: Some(child),
340 transitions,
341 terminal_error: Some(RuntimeBoundaryError::Process(
342 ProcessBoundaryError::Io(format!(
343 "terminate child failed during restart: {err}"
344 )),
345 )),
346 };
347 }
348 apply_restart_backoff(options);
349 state = ModuleHealthState::Restarting;
350 continue;
351 }
352
353 return SuperviseModuleStartResult {
354 event: Some(event),
355 child: Some(child),
356 transitions,
357 terminal_error: None,
358 };
359 }
360 Err(err) => {
361 transitions.push(ModuleHealthTransition {
362 module_id: module.id.clone(),
363 from: Some(state.clone()),
364 to: ModuleHealthState::Failed,
365 attempt: attempts,
366 });
367
368 let should_retry = match module.restart_policy {
369 RestartPolicy::Never => false,
370 RestartPolicy::OnFailure => attempts <= options.on_failure_retry_budget,
371 RestartPolicy::Always => attempts <= options.always_restart_budget,
372 };
373
374 if should_retry {
375 transitions.push(ModuleHealthTransition {
376 module_id: module.id.clone(),
377 from: Some(ModuleHealthState::Failed),
378 to: ModuleHealthState::Restarting,
379 attempt: attempts,
380 });
381 apply_restart_backoff(options);
382 state = ModuleHealthState::Restarting;
383 continue;
384 }
385
386 transitions.push(ModuleHealthTransition {
387 module_id: module.id.clone(),
388 from: Some(ModuleHealthState::Failed),
389 to: ModuleHealthState::Stopped,
390 attempt: attempts,
391 });
392 return SuperviseModuleStartResult {
393 event: None,
394 child: None,
395 transitions,
396 terminal_error: Some(err),
397 };
398 }
399 }
400 }
401}
402
403fn supervise_mcp_module_start(
404 module: &ModuleConfig,
405 pre_spawn: Option<&PreSpawnData>,
406 timeout: Duration,
407) -> SuperviseModuleStartResult {
408 let mut transitions = vec![ModuleHealthTransition {
409 module_id: module.id.clone(),
410 from: None,
411 to: ModuleHealthState::Starting,
412 attempt: 0,
413 }];
414
415 match probe_module_mcp_tools(module, pre_spawn, timeout) {
416 Ok(tools) => {
417 transitions.push(ModuleHealthTransition {
418 module_id: module.id.clone(),
419 from: Some(ModuleHealthState::Starting),
420 to: ModuleHealthState::Healthy,
421 attempt: 1,
422 });
423 SuperviseModuleStartResult {
424 event: Some(mcp_ready_event(module, tools)),
425 child: None,
426 transitions,
427 terminal_error: None,
428 }
429 }
430 Err(error) => {
431 transitions.push(ModuleHealthTransition {
432 module_id: module.id.clone(),
433 from: Some(ModuleHealthState::Starting),
434 to: ModuleHealthState::Failed,
435 attempt: 1,
436 });
437 transitions.push(ModuleHealthTransition {
438 module_id: module.id.clone(),
439 from: Some(ModuleHealthState::Failed),
440 to: ModuleHealthState::Stopped,
441 attempt: 1,
442 });
443 SuperviseModuleStartResult {
444 event: None,
445 child: None,
446 transitions,
447 terminal_error: Some(error),
448 }
449 }
450 }
451}
452
453fn mcp_ready_event(module: &ModuleConfig, tools: Vec<String>) -> EventEnvelope<UnifiedEvent> {
454 let timestamp_ms = current_time_ms();
455 EventEnvelope {
456 event_id: format!("evt-mcp-ready-{}-{timestamp_ms}", module.id),
457 source: "module".to_string(),
458 timestamp_ms,
459 event: UnifiedEvent::Module(ModuleEvent {
460 module: module.id.clone(),
461 event_type: "mcp.ready".to_string(),
462 payload: serde_json::json!({
463 "tools": tools,
464 }),
465 }),
466 }
467}
468
469fn apply_restart_backoff(options: &RuntimeOptions) {
470 if options.supervisor_restart_backoff_ms == 0 {
471 return;
472 }
473 std::thread::sleep(Duration::from_millis(options.supervisor_restart_backoff_ms));
474}
475
476fn spawn_module_capture_first_event(
477 module: &ModuleConfig,
478 pre_spawn: Option<&PreSpawnData>,
479 timeout: Duration,
480 force_terminate_failure: bool,
481) -> Result<(EventEnvelope<UnifiedEvent>, Child), RuntimeBoundaryError> {
482 let env = module_env_with_extra(module, pre_spawn, &[]);
483
484 let mut child = Command::new(&module.command)
485 .args(&module.args)
486 .envs(env.iter().map(|(k, v)| (k, v)))
487 .stdout(Stdio::piped())
488 .stderr(Stdio::null())
489 .spawn()
490 .map_err(|err| {
491 RuntimeBoundaryError::Process(ProcessBoundaryError::SpawnFailed(err.to_string()))
492 })?;
493
494 let stdout = child.stdout.take().ok_or(RuntimeBoundaryError::Process(
495 ProcessBoundaryError::MissingStdout,
496 ))?;
497
498 let (tx, rx) = mpsc::channel();
499 std::thread::spawn(move || {
500 let mut reader = BufReader::new(stdout);
501 let mut line = String::new();
502 let result = reader.read_line(&mut line).map_err(|err| err.to_string());
503 let _ = tx.send((result, line));
504 });
505
506 match rx.recv_timeout(timeout) {
507 Ok((Ok(0), _)) => {
508 let _ = child.wait();
509 Err(RuntimeBoundaryError::Process(
510 ProcessBoundaryError::EmptyOutput,
511 ))
512 }
513 Ok((Ok(_), mut line)) => {
514 if line.ends_with('\n') {
515 line.pop();
516 if line.ends_with('\r') {
517 line.pop();
518 }
519 }
520 match normalize_event_line(&line) {
521 Ok(event) => Ok((event, child)),
522 Err(err) => {
523 if let Err(terminate_err) = terminate_child(&mut child, force_terminate_failure)
524 {
525 return Err(RuntimeBoundaryError::Process(ProcessBoundaryError::Io(
526 format!(
527 "cleanup terminate failed after normalize error: {terminate_err}; normalize_error={err:?}"
528 ),
529 )));
530 }
531 Err(RuntimeBoundaryError::Normalize(err))
532 }
533 }
534 }
535 Ok((Err(err), _)) => {
536 if let Err(terminate_err) = terminate_child(&mut child, force_terminate_failure) {
537 return Err(RuntimeBoundaryError::Process(ProcessBoundaryError::Io(
538 format!(
539 "cleanup terminate failed after io read error: {terminate_err}; io_error={err}"
540 ),
541 )));
542 }
543 Err(RuntimeBoundaryError::Process(ProcessBoundaryError::Io(err)))
544 }
545 Err(_) => {
546 let timeout_ms = timeout.as_millis() as u64;
547 if let Err(terminate_err) = terminate_child(&mut child, force_terminate_failure) {
548 return Err(RuntimeBoundaryError::Process(ProcessBoundaryError::Io(
549 format!(
550 "cleanup terminate failed after timeout({timeout_ms}ms): {terminate_err}"
551 ),
552 )));
553 }
554 Err(RuntimeBoundaryError::Process(
555 ProcessBoundaryError::Timeout { timeout_ms },
556 ))
557 }
558 }
559}
560
561fn terminate_child(child: &mut Child, force_terminate_failure: bool) -> Result<(), String> {
562 if force_terminate_failure {
563 return Err("forced terminate failure for testing".to_string());
564 }
565 match child.try_wait() {
566 Ok(Some(_)) => Ok(()),
567 Ok(None) => {
568 if let Err(kill_err) = child.kill() {
569 return match child.try_wait() {
570 Ok(Some(_)) => Ok(()),
571 Ok(None) => Err(format!(
572 "kill failed while process still running: {kill_err}"
573 )),
574 Err(probe_err) => Err(format!(
575 "kill failed and process status probe failed: {kill_err}; {probe_err}"
576 )),
577 };
578 }
579 child
580 .wait()
581 .map(|_| ())
582 .map_err(|err| format!("wait after kill failed: {err}"))
583 }
584 Err(err) => Err(format!("try_wait failed: {err}")),
585 }
586}
587
588fn supervisor_warning_event(
589 module_id: &str,
590 error: &RuntimeBoundaryError,
591) -> EventEnvelope<UnifiedEvent> {
592 let timestamp_ms = current_time_ms();
593 EventEnvelope {
594 event_id: format!("evt-supervisor-warning-{module_id}-{timestamp_ms}"),
595 source: "module".to_string(),
596 timestamp_ms,
597 event: UnifiedEvent::Module(ModuleEvent {
598 module: module_id.to_string(),
599 event_type: "supervisor.warning".to_string(),
600 payload: serde_json::json!({
601 "error": format!("{error:?}")
602 }),
603 }),
604 }
605}