Skip to main content

meerkat_mobkit/runtime/
supervisor.rs

1//! Module supervisor — process lifecycle, health monitoring, and restart logic.
2
3use super::module_boundary::{module_env_with_extra, module_uses_mcp, probe_module_mcp_tools};
4use super::*;
5
6pub fn run_module_boundary_once(
7    module: &ModuleConfig,
8    pre_spawn: Option<&PreSpawnData>,
9    timeout: Duration,
10) -> Result<EventEnvelope<UnifiedEvent>, RuntimeBoundaryError> {
11    run_module_boundary_with_env(module, pre_spawn, &[], timeout)
12}
13
14pub(super) fn run_module_boundary_with_env(
15    module: &ModuleConfig,
16    pre_spawn: Option<&PreSpawnData>,
17    extra_env: &[(String, String)],
18    timeout: Duration,
19) -> Result<EventEnvelope<UnifiedEvent>, RuntimeBoundaryError> {
20    let env = module_env_with_extra(module, pre_spawn, extra_env);
21    let line = run_process_json_line(&module.command, &module.args, &env, timeout)
22        .map_err(RuntimeBoundaryError::Process)?;
23    normalize_event_line(&line).map_err(RuntimeBoundaryError::Normalize)
24}
25
26pub fn run_discovered_module_once(
27    config: &MobKitConfig,
28    module_id: &str,
29    timeout: Duration,
30) -> Result<EventEnvelope<UnifiedEvent>, RuntimeFromConfigError> {
31    let module = config
32        .modules
33        .iter()
34        .find(|module| module.id == module_id)
35        .ok_or_else(|| {
36            RuntimeFromConfigError::Config(ConfigResolutionError::ModuleNotConfigured(
37                module_id.to_string(),
38            ))
39        })?;
40
41    if !config.discovery.modules.iter().any(|id| id == module_id) {
42        return Err(RuntimeFromConfigError::Config(
43            ConfigResolutionError::ModuleNotDiscovered(module_id.to_string()),
44        ));
45    }
46
47    let pre_spawn = config
48        .pre_spawn
49        .iter()
50        .find(|data| data.module_id == module_id);
51    run_module_boundary_once(module, pre_spawn, timeout).map_err(RuntimeFromConfigError::Runtime)
52}
53
54impl MobkitRuntimeHandle {
55    pub(super) fn is_module_loaded(&self, module_id: &str) -> bool {
56        self.loaded_modules.contains(module_id)
57    }
58    pub fn shutdown(&mut self) -> RuntimeShutdownReport {
59        let mut seq = self
60            .lifecycle_events
61            .last()
62            .map_or(0, |event| event.seq + 1);
63        self.lifecycle_events.push(LifecycleEvent {
64            seq,
65            stage: LifecycleStage::ShutdownRequested,
66        });
67        seq += 1;
68        self.lifecycle_events.push(LifecycleEvent {
69            seq,
70            stage: LifecycleStage::ShutdownComplete,
71        });
72        self.running = false;
73
74        let terminated_modules: Vec<String> = self.loaded_modules.iter().cloned().collect();
75        self.loaded_modules.clear();
76
77        let mut orphan_processes = 0_u32;
78        let children = std::mem::take(&mut self.live_children);
79        for (_, mut child) in children {
80            if terminate_child(&mut child, false).is_err() {
81                orphan_processes += 1;
82            }
83        }
84
85        RuntimeShutdownReport {
86            terminated_modules,
87            orphan_processes,
88        }
89    }
90
91    pub fn is_running(&self) -> bool {
92        self.running
93    }
94
95    pub fn loaded_modules(&self) -> Vec<String> {
96        self.loaded_modules.iter().cloned().collect()
97    }
98
99    pub(super) fn module_and_prespawn(
100        &self,
101        module_id: &str,
102    ) -> Option<(&ModuleConfig, Option<&PreSpawnData>)> {
103        let module = self
104            .config
105            .modules
106            .iter()
107            .find(|module| module.id == module_id)?;
108        let pre_spawn = self
109            .config
110            .pre_spawn
111            .iter()
112            .find(|data| data.module_id == module_id);
113        Some((module, pre_spawn))
114    }
115
116    pub fn reconcile_modules(
117        &mut self,
118        modules: Vec<String>,
119        timeout: Duration,
120    ) -> Result<usize, RuntimeMutationError> {
121        for module_id in &modules {
122            if self
123                .config
124                .modules
125                .iter()
126                .all(|configured| configured.id != *module_id)
127            {
128                return Err(RuntimeMutationError::Config(
129                    ConfigResolutionError::ModuleNotConfigured(module_id.clone()),
130                ));
131            }
132        }
133
134        self.config.discovery.modules = modules.clone();
135        let mut added = 0_usize;
136        for module_id in modules {
137            if self.loaded_modules.contains(&module_id) {
138                continue;
139            }
140            self.spawn_member(&module_id, timeout)?;
141            added += 1;
142        }
143        Ok(added)
144    }
145
146    pub fn spawn_member(
147        &mut self,
148        module_id: &str,
149        timeout: Duration,
150    ) -> Result<(), RuntimeMutationError> {
151        let module = self
152            .config
153            .modules
154            .iter()
155            .find(|module| module.id == module_id)
156            .ok_or_else(|| {
157                RuntimeMutationError::Config(ConfigResolutionError::ModuleNotConfigured(
158                    module_id.to_string(),
159                ))
160            })?;
161
162        let pre_spawn = self
163            .config
164            .pre_spawn
165            .iter()
166            .find(|data| data.module_id == module_id);
167
168        let mut result = supervise_module_start(module, pre_spawn, timeout, &self.runtime_options);
169        self.supervisor_report
170            .transitions
171            .append(&mut result.transitions);
172
173        if let Some(error) = result.terminal_error.clone() {
174            insert_event_sorted(
175                &mut self.merged_events,
176                supervisor_warning_event(module_id, &error),
177            );
178        }
179
180        let Some(event) = result.event else {
181            return Err(RuntimeMutationError::Runtime(
182                result
183                    .terminal_error
184                    .unwrap_or(RuntimeBoundaryError::Process(
185                        ProcessBoundaryError::EmptyOutput,
186                    )),
187            ));
188        };
189        let module_is_mcp = module_uses_mcp(module, pre_spawn);
190
191        if !self
192            .config
193            .discovery
194            .modules
195            .iter()
196            .any(|configured| configured == module_id)
197        {
198            self.config.discovery.modules.push(module_id.to_string());
199        }
200
201        if !module_is_mcp {
202            let Some(mut child) = result.child else {
203                return Err(RuntimeMutationError::Runtime(
204                    result
205                        .terminal_error
206                        .unwrap_or(RuntimeBoundaryError::Process(
207                            ProcessBoundaryError::EmptyOutput,
208                        )),
209                ));
210            };
211            if let Some(mut existing_child) = self.live_children.remove(module_id)
212                && let Err(err) = terminate_child(
213                    &mut existing_child,
214                    self.runtime_options.supervisor_test_force_terminate_failure,
215                )
216            {
217                self.live_children
218                    .insert(module_id.to_string(), existing_child);
219
220                let mut error_message =
221                    format!("failed to terminate existing child before respawn: {err}");
222                if let Err(replacement_err) = terminate_child(&mut child, false) {
223                    error_message.push_str(&format!(
224                        "; failed to terminate replacement child after aborted respawn: {replacement_err}"
225                    ));
226                }
227                let runtime_error =
228                    RuntimeBoundaryError::Process(ProcessBoundaryError::Io(error_message));
229                insert_event_sorted(
230                    &mut self.merged_events,
231                    supervisor_warning_event(module_id, &runtime_error),
232                );
233                return Err(RuntimeMutationError::Runtime(runtime_error));
234            }
235            self.loaded_modules.insert(module_id.to_string());
236            self.live_children.insert(module_id.to_string(), child);
237            insert_event_sorted(&mut self.merged_events, event);
238            return Ok(());
239        }
240
241        if let Some(mut existing_child) = self.live_children.remove(module_id)
242            && let Err(err) = terminate_child(
243                &mut existing_child,
244                self.runtime_options.supervisor_test_force_terminate_failure,
245            )
246        {
247            self.live_children
248                .insert(module_id.to_string(), existing_child);
249            let runtime_error = RuntimeBoundaryError::Process(ProcessBoundaryError::Io(format!(
250                "failed to terminate existing child before MCP respawn: {err}"
251            )));
252            insert_event_sorted(
253                &mut self.merged_events,
254                supervisor_warning_event(module_id, &runtime_error),
255            );
256            return Err(RuntimeMutationError::Runtime(runtime_error));
257        }
258
259        self.loaded_modules.insert(module_id.to_string());
260        insert_event_sorted(&mut self.merged_events, event);
261        Ok(())
262    }
263}
264
265pub(super) struct SuperviseModuleStartResult {
266    pub event: Option<EventEnvelope<UnifiedEvent>>,
267    pub child: Option<Child>,
268    pub transitions: Vec<ModuleHealthTransition>,
269    pub terminal_error: Option<RuntimeBoundaryError>,
270}
271
272pub(super) fn supervise_module_start(
273    module: &ModuleConfig,
274    pre_spawn: Option<&PreSpawnData>,
275    timeout: Duration,
276    options: &RuntimeOptions,
277) -> SuperviseModuleStartResult {
278    if module_uses_mcp(module, pre_spawn) {
279        return supervise_mcp_module_start(module, pre_spawn, timeout);
280    }
281
282    let mut transitions = vec![ModuleHealthTransition {
283        module_id: module.id.clone(),
284        from: None,
285        to: ModuleHealthState::Starting,
286        attempt: 0,
287    }];
288
289    let mut attempts = 0_u32;
290    let mut state = ModuleHealthState::Starting;
291
292    loop {
293        attempts += 1;
294        let result = spawn_module_capture_first_event(
295            module,
296            pre_spawn,
297            timeout,
298            options.supervisor_test_force_terminate_failure,
299        );
300
301        match result {
302            Ok((event, mut child)) => {
303                transitions.push(ModuleHealthTransition {
304                    module_id: module.id.clone(),
305                    from: Some(state.clone()),
306                    to: ModuleHealthState::Healthy,
307                    attempt: attempts,
308                });
309
310                let should_restart = match module.restart_policy {
311                    RestartPolicy::Always => attempts <= options.always_restart_budget,
312                    _ => false,
313                };
314
315                if should_restart {
316                    transitions.push(ModuleHealthTransition {
317                        module_id: module.id.clone(),
318                        from: Some(ModuleHealthState::Healthy),
319                        to: ModuleHealthState::Restarting,
320                        attempt: attempts,
321                    });
322                    if let Err(err) =
323                        terminate_child(&mut child, options.supervisor_test_force_terminate_failure)
324                    {
325                        transitions.push(ModuleHealthTransition {
326                            module_id: module.id.clone(),
327                            from: Some(ModuleHealthState::Restarting),
328                            to: ModuleHealthState::Failed,
329                            attempt: attempts,
330                        });
331                        transitions.push(ModuleHealthTransition {
332                            module_id: module.id.clone(),
333                            from: Some(ModuleHealthState::Failed),
334                            to: ModuleHealthState::Healthy,
335                            attempt: attempts,
336                        });
337                        return SuperviseModuleStartResult {
338                            event: Some(event),
339                            child: Some(child),
340                            transitions,
341                            terminal_error: Some(RuntimeBoundaryError::Process(
342                                ProcessBoundaryError::Io(format!(
343                                    "terminate child failed during restart: {err}"
344                                )),
345                            )),
346                        };
347                    }
348                    apply_restart_backoff(options);
349                    state = ModuleHealthState::Restarting;
350                    continue;
351                }
352
353                return SuperviseModuleStartResult {
354                    event: Some(event),
355                    child: Some(child),
356                    transitions,
357                    terminal_error: None,
358                };
359            }
360            Err(err) => {
361                transitions.push(ModuleHealthTransition {
362                    module_id: module.id.clone(),
363                    from: Some(state.clone()),
364                    to: ModuleHealthState::Failed,
365                    attempt: attempts,
366                });
367
368                let should_retry = match module.restart_policy {
369                    RestartPolicy::Never => false,
370                    RestartPolicy::OnFailure => attempts <= options.on_failure_retry_budget,
371                    RestartPolicy::Always => attempts <= options.always_restart_budget,
372                };
373
374                if should_retry {
375                    transitions.push(ModuleHealthTransition {
376                        module_id: module.id.clone(),
377                        from: Some(ModuleHealthState::Failed),
378                        to: ModuleHealthState::Restarting,
379                        attempt: attempts,
380                    });
381                    apply_restart_backoff(options);
382                    state = ModuleHealthState::Restarting;
383                    continue;
384                }
385
386                transitions.push(ModuleHealthTransition {
387                    module_id: module.id.clone(),
388                    from: Some(ModuleHealthState::Failed),
389                    to: ModuleHealthState::Stopped,
390                    attempt: attempts,
391                });
392                return SuperviseModuleStartResult {
393                    event: None,
394                    child: None,
395                    transitions,
396                    terminal_error: Some(err),
397                };
398            }
399        }
400    }
401}
402
403fn supervise_mcp_module_start(
404    module: &ModuleConfig,
405    pre_spawn: Option<&PreSpawnData>,
406    timeout: Duration,
407) -> SuperviseModuleStartResult {
408    let mut transitions = vec![ModuleHealthTransition {
409        module_id: module.id.clone(),
410        from: None,
411        to: ModuleHealthState::Starting,
412        attempt: 0,
413    }];
414
415    match probe_module_mcp_tools(module, pre_spawn, timeout) {
416        Ok(tools) => {
417            transitions.push(ModuleHealthTransition {
418                module_id: module.id.clone(),
419                from: Some(ModuleHealthState::Starting),
420                to: ModuleHealthState::Healthy,
421                attempt: 1,
422            });
423            SuperviseModuleStartResult {
424                event: Some(mcp_ready_event(module, tools)),
425                child: None,
426                transitions,
427                terminal_error: None,
428            }
429        }
430        Err(error) => {
431            transitions.push(ModuleHealthTransition {
432                module_id: module.id.clone(),
433                from: Some(ModuleHealthState::Starting),
434                to: ModuleHealthState::Failed,
435                attempt: 1,
436            });
437            transitions.push(ModuleHealthTransition {
438                module_id: module.id.clone(),
439                from: Some(ModuleHealthState::Failed),
440                to: ModuleHealthState::Stopped,
441                attempt: 1,
442            });
443            SuperviseModuleStartResult {
444                event: None,
445                child: None,
446                transitions,
447                terminal_error: Some(error),
448            }
449        }
450    }
451}
452
453fn mcp_ready_event(module: &ModuleConfig, tools: Vec<String>) -> EventEnvelope<UnifiedEvent> {
454    let timestamp_ms = current_time_ms();
455    EventEnvelope {
456        event_id: format!("evt-mcp-ready-{}-{timestamp_ms}", module.id),
457        source: "module".to_string(),
458        timestamp_ms,
459        event: UnifiedEvent::Module(ModuleEvent {
460            module: module.id.clone(),
461            event_type: "mcp.ready".to_string(),
462            payload: serde_json::json!({
463                "tools": tools,
464            }),
465        }),
466    }
467}
468
469fn apply_restart_backoff(options: &RuntimeOptions) {
470    if options.supervisor_restart_backoff_ms == 0 {
471        return;
472    }
473    std::thread::sleep(Duration::from_millis(options.supervisor_restart_backoff_ms));
474}
475
476fn spawn_module_capture_first_event(
477    module: &ModuleConfig,
478    pre_spawn: Option<&PreSpawnData>,
479    timeout: Duration,
480    force_terminate_failure: bool,
481) -> Result<(EventEnvelope<UnifiedEvent>, Child), RuntimeBoundaryError> {
482    let env = module_env_with_extra(module, pre_spawn, &[]);
483
484    let mut child = Command::new(&module.command)
485        .args(&module.args)
486        .envs(env.iter().map(|(k, v)| (k, v)))
487        .stdout(Stdio::piped())
488        .stderr(Stdio::null())
489        .spawn()
490        .map_err(|err| {
491            RuntimeBoundaryError::Process(ProcessBoundaryError::SpawnFailed(err.to_string()))
492        })?;
493
494    let stdout = child.stdout.take().ok_or(RuntimeBoundaryError::Process(
495        ProcessBoundaryError::MissingStdout,
496    ))?;
497
498    let (tx, rx) = mpsc::channel();
499    std::thread::spawn(move || {
500        let mut reader = BufReader::new(stdout);
501        let mut line = String::new();
502        let result = reader.read_line(&mut line).map_err(|err| err.to_string());
503        let _ = tx.send((result, line));
504    });
505
506    match rx.recv_timeout(timeout) {
507        Ok((Ok(0), _)) => {
508            let _ = child.wait();
509            Err(RuntimeBoundaryError::Process(
510                ProcessBoundaryError::EmptyOutput,
511            ))
512        }
513        Ok((Ok(_), mut line)) => {
514            if line.ends_with('\n') {
515                line.pop();
516                if line.ends_with('\r') {
517                    line.pop();
518                }
519            }
520            match normalize_event_line(&line) {
521                Ok(event) => Ok((event, child)),
522                Err(err) => {
523                    if let Err(terminate_err) = terminate_child(&mut child, force_terminate_failure)
524                    {
525                        return Err(RuntimeBoundaryError::Process(ProcessBoundaryError::Io(
526                            format!(
527                                "cleanup terminate failed after normalize error: {terminate_err}; normalize_error={err:?}"
528                            ),
529                        )));
530                    }
531                    Err(RuntimeBoundaryError::Normalize(err))
532                }
533            }
534        }
535        Ok((Err(err), _)) => {
536            if let Err(terminate_err) = terminate_child(&mut child, force_terminate_failure) {
537                return Err(RuntimeBoundaryError::Process(ProcessBoundaryError::Io(
538                    format!(
539                        "cleanup terminate failed after io read error: {terminate_err}; io_error={err}"
540                    ),
541                )));
542            }
543            Err(RuntimeBoundaryError::Process(ProcessBoundaryError::Io(err)))
544        }
545        Err(_) => {
546            let timeout_ms = timeout.as_millis() as u64;
547            if let Err(terminate_err) = terminate_child(&mut child, force_terminate_failure) {
548                return Err(RuntimeBoundaryError::Process(ProcessBoundaryError::Io(
549                    format!(
550                        "cleanup terminate failed after timeout({timeout_ms}ms): {terminate_err}"
551                    ),
552                )));
553            }
554            Err(RuntimeBoundaryError::Process(
555                ProcessBoundaryError::Timeout { timeout_ms },
556            ))
557        }
558    }
559}
560
561fn terminate_child(child: &mut Child, force_terminate_failure: bool) -> Result<(), String> {
562    if force_terminate_failure {
563        return Err("forced terminate failure for testing".to_string());
564    }
565    match child.try_wait() {
566        Ok(Some(_)) => Ok(()),
567        Ok(None) => {
568            if let Err(kill_err) = child.kill() {
569                return match child.try_wait() {
570                    Ok(Some(_)) => Ok(()),
571                    Ok(None) => Err(format!(
572                        "kill failed while process still running: {kill_err}"
573                    )),
574                    Err(probe_err) => Err(format!(
575                        "kill failed and process status probe failed: {kill_err}; {probe_err}"
576                    )),
577                };
578            }
579            child
580                .wait()
581                .map(|_| ())
582                .map_err(|err| format!("wait after kill failed: {err}"))
583        }
584        Err(err) => Err(format!("try_wait failed: {err}")),
585    }
586}
587
588fn supervisor_warning_event(
589    module_id: &str,
590    error: &RuntimeBoundaryError,
591) -> EventEnvelope<UnifiedEvent> {
592    let timestamp_ms = current_time_ms();
593    EventEnvelope {
594        event_id: format!("evt-supervisor-warning-{module_id}-{timestamp_ms}"),
595        source: "module".to_string(),
596        timestamp_ms,
597        event: UnifiedEvent::Module(ModuleEvent {
598            module: module_id.to_string(),
599            event_type: "supervisor.warning".to_string(),
600            payload: serde_json::json!({
601                "error": format!("{error:?}")
602            }),
603        }),
604    }
605}