Skip to main content

lean_rs_worker_parent/
supervisor.rs

1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::fmt;
4use std::io::{BufReader, BufWriter, Read as _};
5use std::path::{Path, PathBuf};
6use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
7use std::sync::Mutex;
8use std::sync::mpsc;
9use std::thread;
10use std::time::{Duration, Instant};
11
12use lean_rs_worker_protocol::protocol::{
13    HostSessionMode, MAX_FRAME_BYTES, MAX_FRAME_BYTES_HARD_CAP, MIN_FRAME_BYTES, Message, Request, Response,
14    read_frame, write_frame,
15};
16use lean_rs_worker_protocol::types::{
17    LeanWorkerCapabilityMetadata, LeanWorkerDeclarationFilter, LeanWorkerDeclarationInspectionRequest,
18    LeanWorkerDeclarationInspectionResult, LeanWorkerDeclarationRow, LeanWorkerDeclarationSearch,
19    LeanWorkerDeclarationSearchResult, LeanWorkerDeclarationType, LeanWorkerDeclarationVerificationBatchRequest,
20    LeanWorkerDeclarationVerificationBatchResult, LeanWorkerDeclarationVerificationBatchRow,
21    LeanWorkerDeclarationVerificationFacts, LeanWorkerDeclarationVerificationRequest,
22    LeanWorkerDeclarationVerificationResult, LeanWorkerDeclarationVerificationStatus, LeanWorkerDoctorReport,
23    LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerImportStats, LeanWorkerKernelResult, LeanWorkerMetaResult,
24    LeanWorkerMetaTransparency, LeanWorkerModuleQuery, LeanWorkerModuleQueryBatchEnvelope,
25    LeanWorkerModuleQueryBatchItem, LeanWorkerModuleQueryBatchOutcome, LeanWorkerModuleQueryCacheFacts,
26    LeanWorkerModuleQueryOutcome, LeanWorkerModuleQuerySelector, LeanWorkerModuleSnapshotCacheClearResult,
27    LeanWorkerOutputBudgets, LeanWorkerProofAttemptRequest, LeanWorkerProofAttemptResult, LeanWorkerRendered,
28    LeanWorkerResourceExhaustedFacts,
29};
30use lean_rs_worker_protocol::worker_exports::{fixture_mul_signature, fixture_panic_signature};
31
32use crate::capability::LeanWorkerBootstrapDiagnosticCode;
33use crate::session::LeanWorkerDataSinkTarget;
34use crate::session::{
35    LeanWorkerCancellationToken, LeanWorkerDataSink, LeanWorkerDiagnosticSink, LeanWorkerProgressSink,
36    LeanWorkerRawDataRow, LeanWorkerRawDataSink, LeanWorkerRuntimeMetadata, LeanWorkerSessionConfig,
37    LeanWorkerSessionMode, LeanWorkerStreamSummary, check_cancelled, elapsed_event, report_parent_data_row,
38    report_parent_diagnostic, report_parent_progress,
39};
40
41const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
42const WORKER_EVENT_BUFFER_CAPACITY: usize = 64;
43const DEFAULT_RESTART_INTENSITY_LIMIT: u64 = 16;
44const DEFAULT_RESTART_INTENSITY_WINDOW: Duration = Duration::from_mins(1);
45
46/// Default deadline for one worker request after startup.
47pub const LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(30);
48
49/// Suggested deadline for long-running worker requests.
50pub const LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING: Duration = Duration::from_mins(10);
51
52/// Default deadline for graceful child shutdown before kill escalation.
53pub const LEAN_WORKER_SHUTDOWN_TIMEOUT_DEFAULT: Duration = Duration::from_secs(2);
54
55/// Default deadline for waiting on a killed child process to be reaped.
56pub const LEAN_WORKER_KILL_WAIT_TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);
57
58/// Configuration for starting a `lean-rs-worker` child process.
59///
60/// The executable should be the `lean-rs-worker-child` binary.
61///
62/// **Worker-child panic policy.** The supervisor spawns every child with two
63/// defaults that together pin a process boundary around Lean panics:
64///
65/// - `LEAN_ABORT_ON_PANIC=1`—Lean internal panics terminate the child
66///   instead of returning default values, so the parent observes a fatal
67///   exit rather than silently-corrupted state.
68/// - `LEAN_BACKTRACE=0`—Lean's panic-time backtrace handler is skipped.
69///   Since Lean 4.30 that handler calls back into Lean code (the demangler
70///   is now `@[export]`'d from `Lean.Compiler.NameDemangling`); a worker
71///   child embeds a minimal Lean and cannot guarantee that callback's
72///   transitive module dependencies are initialized when user code panics.
73///   Disabling the backtrace removes that dependency entirely. See
74///   `docs/architecture/06-panic-containment.md` for the boundary argument.
75///
76/// Both are safe defaults—explicit `.env()` entries supplied here override
77/// them, in case a caller knows the dependency is satisfied and wants a
78/// demangled backtrace on the child's stderr.
79#[derive(Clone, Debug)]
80pub struct LeanWorkerConfig {
81    executable: PathBuf,
82    current_dir: Option<PathBuf>,
83    env: Vec<(OsString, OsString)>,
84    startup_timeout: Duration,
85    request_timeout: Duration,
86    shutdown_timeout: Duration,
87    restart_policy: LeanWorkerRestartPolicy,
88    rss_hard_limit_kib: Option<u64>,
89    rss_sample_interval: Duration,
90    max_frame_bytes: u32,
91}
92
93impl LeanWorkerConfig {
94    /// Create a worker configuration for a child executable.
95    pub fn new(executable: impl Into<PathBuf>) -> Self {
96        Self {
97            executable: executable.into(),
98            current_dir: None,
99            env: Vec::new(),
100            startup_timeout: DEFAULT_STARTUP_TIMEOUT,
101            request_timeout: LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT,
102            shutdown_timeout: LEAN_WORKER_SHUTDOWN_TIMEOUT_DEFAULT,
103            restart_policy: LeanWorkerRestartPolicy::default(),
104            rss_hard_limit_kib: None,
105            rss_sample_interval: Duration::from_millis(250),
106            max_frame_bytes: MAX_FRAME_BYTES,
107        }
108    }
109
110    /// Return the child executable path.
111    pub fn executable(&self) -> &Path {
112        &self.executable
113    }
114
115    /// Set the child working directory.
116    #[must_use]
117    pub fn current_dir(mut self, path: impl Into<PathBuf>) -> Self {
118        self.current_dir = Some(path.into());
119        self
120    }
121
122    /// Add or override one child environment variable.
123    #[must_use]
124    pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
125        self.env.push((key.into(), value.into()));
126        self
127    }
128
129    #[cfg(test)]
130    pub(crate) fn env_overrides(&self) -> &[(OsString, OsString)] {
131        &self.env
132    }
133
134    /// Set the maximum time to wait for the child handshake.
135    #[must_use]
136    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
137        self.startup_timeout = timeout;
138        self
139    }
140
141    /// Set the maximum time to wait for one request's terminal response.
142    ///
143    /// The request timeout starts after the request frame is written. It covers
144    /// live rows, diagnostics, progress events, and the terminal response. On
145    /// timeout, the supervisor kills and replaces the child process.
146    #[must_use]
147    pub fn request_timeout(mut self, timeout: Duration) -> Self {
148        self.request_timeout = timeout;
149        self
150    }
151
152    /// Set the maximum time to wait for graceful worker shutdown.
153    ///
154    /// Explicit shutdown and `Drop` both ask the child to terminate first.
155    /// If this deadline expires before the child exits, the supervisor kills
156    /// the process and waits for it to be reaped.
157    #[must_use]
158    pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
159        self.shutdown_timeout = timeout;
160        self
161    }
162
163    /// Use the documented long-running request timeout profile.
164    #[must_use]
165    pub fn long_running_requests(mut self) -> Self {
166        self.request_timeout = LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING;
167        self
168    }
169
170    /// Set the worker restart policy.
171    ///
172    /// Policy checks run before requests enter the child. A policy restart is a
173    /// process restart; it is the only supported reset for Lean process-global
174    /// runtime and import state.
175    #[must_use]
176    pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
177        self.restart_policy = policy;
178        self
179    }
180
181    /// Kill and replace the worker during an in-flight request if its RSS is
182    /// sampled at or above `limit_kib`.
183    ///
184    /// This is a protective hard stop, not a throughput optimization. The
185    /// request that crosses the limit returns
186    /// [`LeanWorkerError::RssHardLimitExceeded`] after the child is replaced.
187    #[must_use]
188    pub fn rss_hard_limit(mut self, limit_kib: u64, sample_interval: Duration) -> Self {
189        self.rss_hard_limit_kib = Some(limit_kib.max(1));
190        self.rss_sample_interval = sample_interval.max(Duration::from_millis(1));
191        self
192    }
193
194    /// Set the per-frame byte cap negotiated with the worker child at handshake.
195    ///
196    /// The cap is announced to the child immediately after its handshake frame
197    /// and applies in both directions for the lifetime of the connection. The
198    /// default is [`MAX_FRAME_BYTES`] (1 MiB), which is enough for every
199    /// session-backed tool whose result composes from many frames. Capabilities
200    /// whose *single* logical result is a frame—e.g. an outline of an
201    /// entire module, or a diagnostics snapshot of a refactor-in-progress
202    /// file—can raise the cap here to admit larger envelopes.
203    ///
204    /// Values are clamped into <code>[[MIN_FRAME_BYTES], [MAX_FRAME_BYTES_HARD_CAP]]</code>.
205    /// The floor keeps even a malformed setter from breaking the handshake
206    /// itself; the ceiling prevents the memory-safety policy from being
207    /// defeated by an absurd value.
208    #[must_use]
209    pub fn max_frame_bytes(mut self, max_frame_bytes: u32) -> Self {
210        self.max_frame_bytes = max_frame_bytes.clamp(MIN_FRAME_BYTES, MAX_FRAME_BYTES_HARD_CAP);
211        self
212    }
213}
214
215/// Policy for cycling a worker child before the next request.
216///
217/// The policy resets retained Lean runtime memory only by restarting the
218/// process. It does not change `lean-rs-host`'s in-process memory model, and it
219/// does not imply that `SessionPool::drain()` can return process-global Lean
220/// memory to the OS.
221#[derive(Clone, Debug, Default, Eq, PartialEq)]
222pub struct LeanWorkerRestartPolicy {
223    max_requests: Option<u64>,
224    max_imports: Option<u64>,
225    max_rss_kib: Option<u64>,
226    idle_restart_after: Option<Duration>,
227    restart_intensity: RestartIntensityLimit,
228}
229
230#[derive(Clone, Copy, Debug, Eq, PartialEq)]
231struct RestartIntensityLimit {
232    max_restarts: u64,
233    window: Duration,
234}
235
236impl Default for RestartIntensityLimit {
237    fn default() -> Self {
238        Self {
239            max_restarts: DEFAULT_RESTART_INTENSITY_LIMIT,
240            window: DEFAULT_RESTART_INTENSITY_WINDOW,
241        }
242    }
243}
244
245impl LeanWorkerRestartPolicy {
246    /// Disable automatic policy restarts.
247    ///
248    /// Use only for short-lived tests, benchmarks, or hosts that enforce a
249    /// process memory boundary elsewhere. Long-running Lean hosts should use
250    /// [`Self::memory_bounded`] and pair it with `LeanWorkerPoolConfig` total
251    /// and per-worker RSS budgets, because fresh imports retain Lean
252    /// process-global state until the child exits.
253    #[must_use]
254    pub fn disabled() -> Self {
255        Self::default()
256    }
257
258    /// Restart before fresh-import-like requests or RSS growth can accumulate
259    /// without bound in one child process.
260    ///
261    /// The production default shape is one full-session import per worker
262    /// under a measured local RSS cap:
263    ///
264    /// ```rust
265    /// # use lean_rs_worker_parent::LeanWorkerRestartPolicy;
266    /// let policy = LeanWorkerRestartPolicy::memory_bounded(1, 1_572_864);
267    /// ```
268    ///
269    /// This is admission and cycling policy, not memory reclamation inside a
270    /// running Lean process.
271    #[must_use]
272    pub fn memory_bounded(max_imports: u64, max_rss_kib: u64) -> Self {
273        Self::default().max_imports(max_imports).max_rss_kib(max_rss_kib)
274    }
275
276    /// Restart before a request when this many requests have entered the child.
277    #[must_use]
278    pub fn max_requests(mut self, limit: u64) -> Self {
279        self.max_requests = Some(limit.max(1));
280        self
281    }
282
283    /// Restart before an import-like request when this many imports have run.
284    #[must_use]
285    pub fn max_imports(mut self, limit: u64) -> Self {
286        self.max_imports = Some(limit.max(1));
287        self
288    }
289
290    /// Restart before a request when measured child RSS is at least this many KiB.
291    ///
292    /// RSS measurement is best effort. It is implemented for the current
293    /// supported Unix development targets; unsupported platforms skip the
294    /// check and increment `LeanWorkerStats::rss_samples_unavailable`.
295    #[must_use]
296    pub fn max_rss_kib(mut self, limit: u64) -> Self {
297        self.max_rss_kib = Some(limit.max(1));
298        self
299    }
300
301    /// Restart before a request when the worker has been idle for this long.
302    #[must_use]
303    pub fn idle_restart_after(mut self, duration: Duration) -> Self {
304        self.idle_restart_after = Some(duration);
305        self
306    }
307
308    /// Refuse replacement after this many restarts in one moving time window.
309    ///
310    /// The limit is enforced after the old child reaches a terminal state and
311    /// before a replacement is spawned. Exhaustion returns
312    /// [`LeanWorkerError::RestartLimitExceeded`] and leaves the supervisor
313    /// without an accepted child; create a new worker or pool entry to apply a
314    /// fresh restart window.
315    #[must_use]
316    pub fn max_restarts_per_window(mut self, max_restarts: u64, window: Duration) -> Self {
317        self.restart_intensity = RestartIntensityLimit {
318            max_restarts: max_restarts.max(1),
319            window: window.max(Duration::from_millis(1)),
320        };
321        self
322    }
323}
324
325/// Reason recorded for the latest worker cycle.
326#[derive(Clone, Debug, Eq, PartialEq)]
327pub enum LeanWorkerRestartReason {
328    /// The caller explicitly requested a process cycle.
329    Explicit,
330    /// Request count reached the configured limit before the next request.
331    MaxRequests { limit: u64 },
332    /// Import-like request count reached the configured limit before the next import.
333    MaxImports { limit: u64 },
334    /// Child resident set size reached the configured limit.
335    RssCeiling {
336        current_kib: u64,
337        limit_kib: u64,
338        last_import_stats: Option<LeanWorkerImportStats>,
339    },
340    /// Child resident set size crossed the hard in-flight kill limit.
341    RssHardLimit {
342        operation: &'static str,
343        current_kib: u64,
344        limit_kib: u64,
345        last_import_stats: Option<LeanWorkerImportStats>,
346    },
347    /// Worker was idle at least as long as the configured limit.
348    Idle { idle_for: Duration, limit: Duration },
349    /// Parent-side cancellation replaced the child during an in-flight request.
350    Cancelled { operation: &'static str },
351    /// Parent-side request timeout replaced the child during an in-flight request.
352    RequestTimeout {
353        operation: &'static str,
354        duration: Duration,
355    },
356    /// The child aborted (SIGABRT / fatal panic) during an in-flight request and
357    /// the supervisor respawned it. Used by the read-only verify/proof-state
358    /// guard that converts such an abort into a degraded verdict instead of a
359    /// hard error.
360    ChildAbort { operation: &'static str },
361}
362
363impl LeanWorkerRestartReason {
364    /// Stable wire/policy cause name for this restart reason.
365    ///
366    /// This is intentionally smaller than the full enum payload: callers can
367    /// branch on the cause while still using the typed enum when they need
368    /// details such as limits or durations.
369    #[must_use]
370    pub const fn stable_cause(&self) -> &'static str {
371        match self {
372            Self::Explicit => "explicit",
373            Self::MaxRequests { .. } => "max_requests",
374            Self::MaxImports { .. } => "max_imports",
375            Self::RssCeiling { .. } => "rss_ceiling",
376            Self::RssHardLimit { .. } => "rss_hard_limit",
377            Self::Idle { .. } => "idle",
378            Self::Cancelled { .. } => "cancelled",
379            Self::RequestTimeout { .. } => "timeout",
380            Self::ChildAbort { .. } => "child_abort",
381        }
382    }
383}
384
385/// Timing facts for the most recent synchronous worker replacement.
386///
387/// These are observability facts only. A replacement remains a process cycle
388/// hidden behind supervisor or pool policy; callers do not receive child
389/// identities or lifecycle handles.
390#[derive(Clone, Debug, Default, Eq, PartialEq)]
391pub struct LeanWorkerReplacementTiming {
392    pub spawn_handshake: Duration,
393    pub capability_load: Duration,
394    pub session_open_import: Duration,
395    pub first_command: Option<Duration>,
396    pub warm_command: Option<Duration>,
397    pub replacement_total: Duration,
398    pub replacement_reason: String,
399    pub replacement_budget_status: String,
400}
401
402/// Snapshot of worker lifecycle counters.
403#[derive(Clone, Debug, Default, Eq, PartialEq)]
404pub struct LeanWorkerStats {
405    /// Requests that entered a worker child.
406    pub requests: u64,
407    /// Import-like requests that entered a worker child.
408    pub imports: u64,
409    /// Import-like requests that reached the supervisor admission gate.
410    pub import_like_admission_attempts: u64,
411    /// Import-like requests admitted past parent-side restart/RSS policy.
412    pub import_like_admitted: u64,
413    /// Last sampled child RSS before an import-like request was admitted, when available.
414    pub last_import_like_rss_before_admission_kib: Option<u64>,
415    /// Child exits observed by the supervisor, including policy cycles.
416    pub exits: u64,
417    /// Policy or explicit restarts performed by the supervisor.
418    pub restarts: u64,
419    /// Explicit process cycles.
420    pub explicit_cycles: u64,
421    /// Restarts caused by `LeanWorkerRestartPolicy::max_requests`.
422    pub max_request_restarts: u64,
423    /// Restarts caused by `LeanWorkerRestartPolicy::max_imports`.
424    pub max_import_restarts: u64,
425    /// Restarts caused by `LeanWorkerRestartPolicy::max_rss_kib`.
426    pub rss_restarts: u64,
427    /// Restarts caused by `LeanWorkerRestartPolicy::idle_restart_after`.
428    pub idle_restarts: u64,
429    /// Restarts caused by parent-side cancellation of an in-flight request.
430    pub cancelled_restarts: u64,
431    /// Restarts caused by parent-side request timeouts.
432    pub timeout_restarts: u64,
433    /// RSS checks skipped because the platform did not provide a usable sample.
434    pub rss_samples_unavailable: u64,
435    /// Last measured child RSS in KiB, when a policy check could sample it.
436    pub last_rss_kib: Option<u64>,
437    /// Most recent restart reason, if any.
438    pub last_restart_reason: Option<LeanWorkerRestartReason>,
439    /// Worker replacement attempts performed synchronously by this supervisor.
440    pub replacement_attempts: u64,
441    /// Successful worker replacements.
442    pub replacement_successes: u64,
443    /// Failed worker replacements.
444    pub replacement_failures: u64,
445    /// Replacements admitted by the configured policy without overlapping children.
446    pub replacement_budget_admitted: u64,
447    /// Replacement attempts skipped by a budget guard.
448    pub replacement_budget_skipped: u64,
449    /// Most recent replacement timing facts, if a replacement has succeeded.
450    pub last_replacement_timing: Option<LeanWorkerReplacementTiming>,
451    /// Most recent skipped replacement reason, if any.
452    pub last_replacement_skipped_reason: Option<String>,
453    /// Most recent worker spawn and protocol-handshake elapsed time.
454    pub last_spawn_handshake_elapsed: Option<Duration>,
455    /// Most recent capability build/load phase elapsed time observed by a capability builder.
456    pub last_capability_load_elapsed: Option<Duration>,
457    /// Most recent host-session open/import elapsed time.
458    pub last_session_open_import_elapsed: Option<Duration>,
459    /// Most recent first command elapsed time after opening or replacing a worker.
460    pub last_first_command_elapsed: Option<Duration>,
461    /// Most recent warm command elapsed time on an already-open worker.
462    pub last_warm_command_elapsed: Option<Duration>,
463    /// Lean-native import attribution for the most recent opened host session, if any.
464    pub last_import_stats: Option<LeanWorkerImportStats>,
465    /// Streaming requests that entered a worker child.
466    pub stream_requests: u64,
467    /// Streaming requests that reached terminal success.
468    pub stream_successes: u64,
469    /// Streaming requests that failed after entering the child.
470    pub stream_failures: u64,
471    /// Data rows delivered to parent-side sinks.
472    pub data_rows_delivered: u64,
473    /// Raw row payload bytes delivered to parent-side sinks.
474    pub data_row_payload_bytes: u64,
475    /// Total elapsed time spent in streaming requests.
476    pub stream_elapsed: Duration,
477    /// Times the bounded worker-event reader had to wait for the parent to drain events.
478    pub backpressure_waits: u64,
479    /// Streaming requests that failed after bounded-buffer backpressure was observed.
480    pub backpressure_failures: u64,
481}
482
483/// Compact lifecycle facts for callers that supervise a worker boundary.
484///
485/// `LeanWorkerStats` remains the detailed counter set. This snapshot is the
486/// stable, policy-facing view: a caller can compare two snapshots to observe
487/// every restart performed inside the supervisor, including restarts caused by
488/// timeout, cancellation, RSS limits, import/request cycling, or explicit
489/// cycles.
490#[derive(Clone, Debug, Eq, PartialEq)]
491pub struct LeanWorkerLifecycleSnapshot {
492    /// Monotone generation number for the current child. It equals the total
493    /// restart count observed by this supervisor.
494    pub worker_generation: u64,
495    /// Total restarts performed by the supervisor.
496    pub restarts: u64,
497    /// Child exits observed by the supervisor, including policy cycles.
498    pub exits: u64,
499    /// Most recent restart reason, if any.
500    pub last_restart_reason: Option<LeanWorkerRestartReason>,
501    /// Most recent child exit observed by the supervisor, if any.
502    pub last_exit: Option<LeanWorkerExit>,
503    /// Last measured child RSS in KiB, when available.
504    pub last_rss_kib: Option<u64>,
505    /// RSS checks skipped because the platform did not provide a usable sample.
506    pub rss_samples_unavailable: u64,
507}
508
509impl LeanWorkerLifecycleSnapshot {
510    fn from_worker(stats: &LeanWorkerStats, last_exit: Option<LeanWorkerExit>) -> Self {
511        Self {
512            worker_generation: stats.restarts,
513            restarts: stats.restarts,
514            exits: stats.exits,
515            last_restart_reason: stats.last_restart_reason.clone(),
516            last_exit,
517            last_rss_kib: stats.last_rss_kib,
518            rss_samples_unavailable: stats.rss_samples_unavailable,
519        }
520    }
521}
522
523impl LeanWorkerStats {
524    fn record_restart(&mut self, reason: LeanWorkerRestartReason) {
525        self.restarts = self.restarts.saturating_add(1);
526        match &reason {
527            LeanWorkerRestartReason::Explicit => {
528                self.explicit_cycles = self.explicit_cycles.saturating_add(1);
529            }
530            LeanWorkerRestartReason::MaxRequests { .. } => {
531                self.max_request_restarts = self.max_request_restarts.saturating_add(1);
532            }
533            LeanWorkerRestartReason::MaxImports { .. } => {
534                self.max_import_restarts = self.max_import_restarts.saturating_add(1);
535            }
536            LeanWorkerRestartReason::RssCeiling { .. } => {
537                self.rss_restarts = self.rss_restarts.saturating_add(1);
538            }
539            LeanWorkerRestartReason::RssHardLimit { .. } => {
540                self.rss_restarts = self.rss_restarts.saturating_add(1);
541            }
542            LeanWorkerRestartReason::Idle { .. } => {
543                self.idle_restarts = self.idle_restarts.saturating_add(1);
544            }
545            LeanWorkerRestartReason::Cancelled { .. } => {
546                self.cancelled_restarts = self.cancelled_restarts.saturating_add(1);
547            }
548            LeanWorkerRestartReason::RequestTimeout { .. } => {
549                self.timeout_restarts = self.timeout_restarts.saturating_add(1);
550            }
551            // The general `restarts` counter and `last_restart_reason` already
552            // capture child aborts; `stable_cause() == "child_abort"` keys the
553            // parent's relabel, so no dedicated counter is warranted.
554            LeanWorkerRestartReason::ChildAbort { .. } => {}
555        }
556        self.last_restart_reason = Some(reason);
557    }
558}
559
560/// Public lifecycle state for a worker child.
561#[derive(Clone, Debug, Eq, PartialEq)]
562pub enum LeanWorkerStatus {
563    /// The worker process is still running.
564    Running,
565    /// The worker process has exited.
566    Exited(LeanWorkerExit),
567}
568
569/// Rendered child-process exit information.
570#[derive(Clone, Debug, Eq, PartialEq)]
571pub struct LeanWorkerExit {
572    /// Whether the child process exited successfully.
573    pub success: bool,
574    /// The platform exit code when one is available.
575    pub code: Option<i32>,
576    /// The platform-rendered process status.
577    pub status: String,
578    /// Captured child diagnostics, if available.
579    pub diagnostics: String,
580}
581
582impl LeanWorkerExit {
583    fn from_status(status: ExitStatus, diagnostics: String) -> Self {
584        Self {
585            success: status.success(),
586            code: status.code(),
587            status: status.to_string(),
588            diagnostics,
589        }
590    }
591}
592
593/// Structured result of shutting down a worker child.
594#[derive(Clone, Debug, Eq, PartialEq)]
595pub struct LeanWorkerShutdownReport {
596    /// How shutdown reached a terminal child state.
597    pub outcome: LeanWorkerShutdownOutcome,
598    /// Final child-process exit information.
599    pub exit: LeanWorkerExit,
600    /// Graceful-shutdown deadline used for this operation.
601    pub graceful_timeout: Duration,
602    /// Total elapsed shutdown time observed by the parent.
603    pub elapsed: Duration,
604    /// Time spent after kill escalation, when a kill was needed.
605    pub kill_elapsed: Option<Duration>,
606    /// Time spent waiting for the final child exit.
607    pub wait_elapsed: Duration,
608}
609
610/// Shutdown path used to reach a terminal child state.
611#[derive(Clone, Debug, Eq, PartialEq)]
612pub enum LeanWorkerShutdownOutcome {
613    /// The child had already exited before shutdown was requested.
614    AlreadyExited,
615    /// The child accepted `Request::Terminate` and exited without escalation.
616    Graceful,
617    /// The child did not exit before the graceful deadline, so the parent killed it.
618    GracefulTimedOutKilled,
619    /// The graceful protocol path failed, so the parent killed the child.
620    GracefulProtocolFailedKilled,
621    /// The caller requested an immediate kill/reap path.
622    KillOnly,
623}
624
625/// Errors reported by the worker supervisor.
626#[derive(Debug)]
627pub enum LeanWorkerError {
628    /// The worker child could not be spawned.
629    Spawn {
630        executable: PathBuf,
631        source: std::io::Error,
632    },
633    /// The default worker child executable could not be resolved.
634    WorkerChildUnresolved {
635        /// Candidate paths checked by the default resolver.
636        tried: Vec<PathBuf>,
637    },
638    /// The resolved worker child is missing or is not executable.
639    WorkerChildNotExecutable { path: PathBuf, reason: String },
640    /// Worker bootstrap preflight failed before a real command ran.
641    Bootstrap {
642        code: LeanWorkerBootstrapDiagnosticCode,
643        message: String,
644    },
645    /// The capability Lake target could not be built.
646    CapabilityBuild {
647        /// Typed Lake/toolchain diagnostic from `lean-toolchain`.
648        diagnostic: lean_toolchain::LinkDiagnostics,
649    },
650    /// The child process could not be prepared after spawning.
651    Setup { message: String },
652    /// The child did not complete the startup handshake.
653    Handshake { message: String },
654    /// The worker protocol failed after the handshake.
655    Protocol { message: String },
656    /// The child returned a typed worker error.
657    Worker { code: String, message: String },
658    /// The child exited while a request was in flight.
659    ChildExited { exit: LeanWorkerExit },
660    /// The child exited fatally while a request was in flight.
661    ChildPanicOrAbort { exit: LeanWorkerExit },
662    /// A worker operation timed out.
663    Timeout {
664        operation: &'static str,
665        duration: Duration,
666        resource: Box<LeanWorkerResourceExhaustedFacts>,
667    },
668    /// The worker was killed and replaced because an in-flight RSS sample
669    /// crossed the hard parent-side limit.
670    RssHardLimitExceeded {
671        operation: &'static str,
672        current_kib: u64,
673        limit_kib: u64,
674        last_import_stats: Option<Box<LeanWorkerImportStats>>,
675        resource: Box<LeanWorkerResourceExhaustedFacts>,
676    },
677    /// A parent-side cancellation token was observed.
678    Cancelled {
679        operation: &'static str,
680        resource: Box<LeanWorkerResourceExhaustedFacts>,
681    },
682    /// A parent-side progress sink panicked while handling a worker event.
683    ProgressPanic { message: String },
684    /// A parent-side data sink panicked while handling a worker row.
685    DataSinkPanic { message: String },
686    /// A parent-side diagnostic sink panicked while handling a worker diagnostic.
687    DiagnosticSinkPanic { message: String },
688    /// A streaming export returned a nonzero downstream status byte.
689    StreamExportFailed { status: u8 },
690    /// The in-child string callback helper returned a callback failure status.
691    StreamCallbackFailed { status: u8, description: String },
692    /// A streaming callback emitted a malformed row envelope.
693    StreamRowMalformed { message: String },
694    /// A capability metadata export returned malformed JSON.
695    CapabilityMetadataMalformed { message: String },
696    /// Capability metadata did not match the caller's requested expectation.
697    CapabilityMetadataMismatch {
698        export: String,
699        expected: Box<LeanWorkerCapabilityMetadata>,
700        actual: Box<LeanWorkerCapabilityMetadata>,
701    },
702    /// A capability doctor export returned malformed JSON.
703    CapabilityDoctorMalformed { message: String },
704    /// A typed command request could not be serialized as JSON.
705    TypedCommandRequestEncode { export: String, message: String },
706    /// A typed non-streaming command response could not be decoded.
707    TypedCommandResponseDecode { export: String, message: String },
708    /// A typed streaming command row payload could not be decoded.
709    TypedCommandRowDecode {
710        export: String,
711        stream: String,
712        sequence: u64,
713        message: String,
714    },
715    /// A typed streaming command terminal summary could not be decoded.
716    TypedCommandSummaryDecode { export: String, message: String },
717    /// A pool session lease was invalidated by a worker lifecycle transition.
718    LeaseInvalidated { reason: String },
719    /// A local worker pool cannot admit another distinct session key.
720    WorkerPoolExhausted {
721        max_workers: usize,
722        resource: Box<LeanWorkerResourceExhaustedFacts>,
723    },
724    /// A local worker pool cannot admit work without exceeding its RSS budget.
725    WorkerPoolMemoryBudgetExceeded {
726        current_kib: u64,
727        limit_kib: u64,
728        last_import_stats: Option<Box<LeanWorkerImportStats>>,
729        resource: Box<LeanWorkerResourceExhaustedFacts>,
730    },
731    /// Waiting for local worker-pool admission exceeded the configured limit.
732    WorkerPoolQueueTimeout {
733        waited: Duration,
734        resource: Box<LeanWorkerResourceExhaustedFacts>,
735    },
736    /// A supervising policy refused to restart the worker again in its current window.
737    RestartLimitExceeded { restarts: u64, window: Duration },
738    /// The public supervisor does not support the requested operation.
739    UnsupportedRequest { operation: &'static str },
740    /// Waiting for a child process failed.
741    Wait { source: std::io::Error },
742    /// Killing a child process failed.
743    Kill { source: std::io::Error },
744    /// Waiting for a child process exceeded the configured bounded wait.
745    WaitTimeout {
746        operation: &'static str,
747        duration: Duration,
748    },
749    /// The worker has begun shutdown and no longer accepts new requests.
750    ShutdownInProgress { operation: &'static str },
751}
752
753impl fmt::Display for LeanWorkerError {
754    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
755        match self {
756            Self::Spawn { executable, source } => {
757                write!(f, "failed to spawn worker {}: {source}", executable.display())
758            }
759            Self::WorkerChildUnresolved { tried } => {
760                let tried = tried
761                    .iter()
762                    .map(|path| path.display().to_string())
763                    .collect::<Vec<_>>()
764                    .join(", ");
765                write!(
766                    f,
767                    "could not resolve lean-rs-worker-child; set LEAN_RS_WORKER_CHILD or place it beside the current executable (tried: {tried})"
768                )
769            }
770            Self::WorkerChildNotExecutable { path, reason } => {
771                write!(f, "worker child '{}' is not executable: {reason}", path.display())
772            }
773            Self::Bootstrap { code, message } => {
774                write!(f, "worker bootstrap check {code} failed: {message}")
775            }
776            Self::CapabilityBuild { diagnostic } => {
777                write!(f, "worker capability Lake target build failed: {diagnostic}")
778            }
779            Self::Setup { message } => write!(f, "worker child setup failed: {message}"),
780            Self::Handshake { message } => write!(f, "worker handshake failed: {message}"),
781            Self::Protocol { message } => write!(f, "worker protocol failed: {message}"),
782            Self::Worker { code, message } => write!(f, "worker returned {code}: {message}"),
783            Self::ChildExited { exit } => write_exit(f, "worker exited", exit),
784            Self::ChildPanicOrAbort { exit } => write_exit(f, "worker exited fatally", exit),
785            Self::Timeout {
786                operation, duration, ..
787            } => {
788                write!(f, "worker operation {operation} timed out after {duration:?}")
789            }
790            Self::RssHardLimitExceeded {
791                operation,
792                current_kib,
793                limit_kib,
794                last_import_stats,
795                ..
796            } => {
797                write!(
798                    f,
799                    "worker operation {operation} exceeded hard RSS limit; current_kib={current_kib} limit_kib={limit_kib}; {}",
800                    import_stats_diagnostic(last_import_stats.as_deref())
801                )
802            }
803            Self::Cancelled { operation, .. } => write!(f, "worker operation {operation} was cancelled"),
804            Self::ProgressPanic { message } => write!(f, "worker progress sink panicked: {message}"),
805            Self::DataSinkPanic { message } => write!(f, "worker data sink panicked: {message}"),
806            Self::DiagnosticSinkPanic { message } => {
807                write!(f, "worker diagnostic sink panicked: {message}")
808            }
809            Self::StreamExportFailed { status } => write!(f, "streaming export returned status {status}"),
810            Self::StreamCallbackFailed { status, description } => {
811                write!(f, "streaming callback failed with status {status}: {description}")
812            }
813            Self::StreamRowMalformed { message } => write!(f, "streaming export emitted malformed row: {message}"),
814            Self::CapabilityMetadataMalformed { message } => {
815                write!(f, "capability metadata export returned malformed JSON: {message}")
816            }
817            Self::CapabilityMetadataMismatch { export, .. } => {
818                write!(f, "capability metadata from {export} did not match expectation")
819            }
820            Self::CapabilityDoctorMalformed { message } => {
821                write!(f, "capability doctor export returned malformed JSON: {message}")
822            }
823            Self::TypedCommandRequestEncode { export, message } => {
824                write!(f, "typed worker command {export} request JSON encode failed: {message}")
825            }
826            Self::TypedCommandResponseDecode { export, message } => {
827                write!(
828                    f,
829                    "typed worker command {export} response JSON decode failed: {message}"
830                )
831            }
832            Self::TypedCommandRowDecode {
833                export,
834                stream,
835                sequence,
836                message,
837            } => {
838                write!(
839                    f,
840                    "typed worker command {export} row decode failed at stream {stream} sequence {sequence}: {message}"
841                )
842            }
843            Self::TypedCommandSummaryDecode { export, message } => {
844                write!(
845                    f,
846                    "typed worker command {export} terminal summary decode failed: {message}"
847                )
848            }
849            Self::LeaseInvalidated { reason } => write!(f, "worker pool lease was invalidated: {reason}"),
850            Self::WorkerPoolExhausted { max_workers, .. } => {
851                write!(
852                    f,
853                    "worker pool cannot admit another session key; max_workers={max_workers}"
854                )
855            }
856            Self::WorkerPoolMemoryBudgetExceeded {
857                current_kib,
858                limit_kib,
859                last_import_stats,
860                ..
861            } => {
862                write!(
863                    f,
864                    "worker pool cannot admit work within RSS budget; current_kib={current_kib} limit_kib={limit_kib}; {}",
865                    import_stats_diagnostic(last_import_stats.as_deref())
866                )
867            }
868            Self::WorkerPoolQueueTimeout { waited, .. } => {
869                write!(f, "worker pool admission timed out after {waited:?}")
870            }
871            Self::RestartLimitExceeded { restarts, window } => {
872                write!(
873                    f,
874                    "worker restart limit exceeded after {restarts} restarts in {window:?}"
875                )
876            }
877            Self::UnsupportedRequest { operation } => {
878                write!(f, "worker operation {operation} is not supported")
879            }
880            Self::Wait { source } => write!(f, "failed to wait for worker child: {source}"),
881            Self::Kill { source } => write!(f, "failed to kill worker child: {source}"),
882            Self::WaitTimeout { operation, duration } => {
883                write!(
884                    f,
885                    "timed out waiting for worker child during {operation} after {duration:?}"
886                )
887            }
888            Self::ShutdownInProgress { operation } => {
889                write!(
890                    f,
891                    "worker operation {operation} was rejected because shutdown is in progress"
892                )
893            }
894        }
895    }
896}
897
898impl std::error::Error for LeanWorkerError {
899    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
900        match self {
901            Self::Spawn { source, .. } | Self::Wait { source } | Self::Kill { source } => Some(source),
902            Self::CapabilityBuild { diagnostic } => Some(diagnostic),
903            Self::WorkerChildUnresolved { .. } | Self::WorkerChildNotExecutable { .. } | Self::Bootstrap { .. } => None,
904            Self::Setup { .. }
905            | Self::Handshake { .. }
906            | Self::Protocol { .. }
907            | Self::Worker { .. }
908            | Self::ChildExited { .. }
909            | Self::ChildPanicOrAbort { .. }
910            | Self::Timeout { .. }
911            | Self::RssHardLimitExceeded { .. }
912            | Self::Cancelled { .. }
913            | Self::ProgressPanic { .. }
914            | Self::DataSinkPanic { .. }
915            | Self::DiagnosticSinkPanic { .. }
916            | Self::StreamExportFailed { .. }
917            | Self::StreamCallbackFailed { .. }
918            | Self::StreamRowMalformed { .. }
919            | Self::CapabilityMetadataMalformed { .. }
920            | Self::CapabilityMetadataMismatch { .. }
921            | Self::CapabilityDoctorMalformed { .. }
922            | Self::TypedCommandRequestEncode { .. }
923            | Self::TypedCommandResponseDecode { .. }
924            | Self::TypedCommandRowDecode { .. }
925            | Self::TypedCommandSummaryDecode { .. }
926            | Self::LeaseInvalidated { .. }
927            | Self::WorkerPoolExhausted { .. }
928            | Self::WorkerPoolMemoryBudgetExceeded { .. }
929            | Self::WorkerPoolQueueTimeout { .. }
930            | Self::RestartLimitExceeded { .. }
931            | Self::UnsupportedRequest { .. }
932            | Self::WaitTimeout { .. }
933            | Self::ShutdownInProgress { .. } => None,
934        }
935    }
936}
937
938impl LeanWorkerError {
939    /// Structured resource-boundary facts for errors caused by an admission,
940    /// timeout, cancellation, or RSS limit. `None` means the error is a
941    /// protocol, Lean, build, or lifecycle failure without resource evidence.
942    #[must_use]
943    pub fn resource_exhausted_facts(&self) -> Option<&LeanWorkerResourceExhaustedFacts> {
944        match self {
945            Self::Timeout { resource, .. }
946            | Self::RssHardLimitExceeded { resource, .. }
947            | Self::Cancelled { resource, .. }
948            | Self::WorkerPoolExhausted { resource, .. }
949            | Self::WorkerPoolMemoryBudgetExceeded { resource, .. }
950            | Self::WorkerPoolQueueTimeout { resource, .. } => Some(resource.as_ref()),
951            Self::Spawn { .. }
952            | Self::Kill { .. }
953            | Self::WorkerChildUnresolved { .. }
954            | Self::WorkerChildNotExecutable { .. }
955            | Self::Bootstrap { .. }
956            | Self::CapabilityBuild { .. }
957            | Self::Setup { .. }
958            | Self::Handshake { .. }
959            | Self::Protocol { .. }
960            | Self::Worker { .. }
961            | Self::ChildExited { .. }
962            | Self::ChildPanicOrAbort { .. }
963            | Self::ProgressPanic { .. }
964            | Self::DataSinkPanic { .. }
965            | Self::DiagnosticSinkPanic { .. }
966            | Self::StreamExportFailed { .. }
967            | Self::StreamCallbackFailed { .. }
968            | Self::StreamRowMalformed { .. }
969            | Self::CapabilityMetadataMalformed { .. }
970            | Self::CapabilityMetadataMismatch { .. }
971            | Self::CapabilityDoctorMalformed { .. }
972            | Self::TypedCommandRequestEncode { .. }
973            | Self::TypedCommandResponseDecode { .. }
974            | Self::TypedCommandRowDecode { .. }
975            | Self::TypedCommandSummaryDecode { .. }
976            | Self::LeaseInvalidated { .. }
977            | Self::RestartLimitExceeded { .. }
978            | Self::UnsupportedRequest { .. }
979            | Self::WaitTimeout { .. }
980            | Self::ShutdownInProgress { .. }
981            | Self::Wait { .. } => None,
982        }
983    }
984}
985
986fn duration_ms(duration: Duration) -> u64 {
987    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
988}
989
990#[allow(
991    clippy::too_many_arguments,
992    reason = "flat fact construction mirrors the public diagnostic payload"
993)]
994fn resource_facts(
995    cause: impl Into<String>,
996    work_entered_child: bool,
997    operation: Option<&'static str>,
998    current_rss_kib: Option<u64>,
999    limit_kib: Option<u64>,
1000    import_count: Option<u64>,
1001    worker_generation: Option<u64>,
1002    restart_reason: Option<String>,
1003    queue_wait: Option<Duration>,
1004    duration: Option<Duration>,
1005    cold_open_attempts: Option<u64>,
1006    cold_open_admitted: Option<u64>,
1007    cold_open_refusals: Option<u64>,
1008    import_like_requests: Option<u64>,
1009    import_like_admitted: Option<u64>,
1010    last_import_stats: Option<LeanWorkerImportStats>,
1011) -> LeanWorkerResourceExhaustedFacts {
1012    LeanWorkerResourceExhaustedFacts {
1013        cause: cause.into(),
1014        work_entered_child,
1015        operation: operation.map(str::to_owned),
1016        current_rss_kib,
1017        limit_kib,
1018        import_count,
1019        worker_generation,
1020        restart_reason,
1021        queue_wait_ms: queue_wait.map(duration_ms),
1022        duration_ms: duration.map(duration_ms),
1023        cold_open_attempts,
1024        cold_open_admitted,
1025        cold_open_refusals,
1026        import_like_requests,
1027        import_like_admitted,
1028        last_import_stats,
1029    }
1030}
1031
1032fn worker_resource_facts(
1033    cause: impl Into<String>,
1034    work_entered_child: bool,
1035    operation: Option<&'static str>,
1036    stats: &LeanWorkerStats,
1037    current_rss_kib: Option<u64>,
1038    limit_kib: Option<u64>,
1039    duration: Option<Duration>,
1040) -> LeanWorkerResourceExhaustedFacts {
1041    resource_facts(
1042        cause,
1043        work_entered_child,
1044        operation,
1045        current_rss_kib,
1046        limit_kib,
1047        Some(stats.imports),
1048        Some(stats.restarts),
1049        stats
1050            .last_restart_reason
1051            .as_ref()
1052            .map(LeanWorkerRestartReason::stable_cause)
1053            .map(str::to_owned),
1054        None,
1055        duration,
1056        None,
1057        None,
1058        None,
1059        Some(stats.import_like_admission_attempts),
1060        Some(stats.import_like_admitted),
1061        stats.last_import_stats.clone(),
1062    )
1063}
1064
1065#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
1066struct WorkerGeneration(u64);
1067
1068impl WorkerGeneration {
1069    fn next(self) -> Self {
1070        Self(self.0.saturating_add(1))
1071    }
1072}
1073
1074#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
1075struct WorkerRequestId(u64);
1076
1077impl WorkerRequestId {
1078    fn next(self) -> Self {
1079        Self(self.0.saturating_add(1))
1080    }
1081}
1082
1083#[derive(Clone, Debug, Eq, PartialEq)]
1084struct InFlightRequest {
1085    id: WorkerRequestId,
1086    operation: &'static str,
1087    generation: WorkerGeneration,
1088}
1089
1090#[derive(Clone, Debug, Eq, PartialEq)]
1091enum WorkerSupervisorState {
1092    Idle { generation: WorkerGeneration },
1093    Busy { request: InFlightRequest },
1094    Streaming { request: InFlightRequest },
1095    Stopping { generation: WorkerGeneration },
1096    Killing { generation: WorkerGeneration },
1097    Reaping { generation: WorkerGeneration },
1098    Crashed { generation: WorkerGeneration },
1099    RestartExhausted { generation: WorkerGeneration },
1100    Exited { generation: WorkerGeneration },
1101}
1102
1103impl WorkerSupervisorState {
1104    fn rejects_new_requests(&self) -> bool {
1105        matches!(
1106            self,
1107            Self::Stopping { .. } | Self::Killing { .. } | Self::Reaping { .. } | Self::RestartExhausted { .. }
1108        )
1109    }
1110
1111    fn current_operation(&self) -> Option<&'static str> {
1112        match self {
1113            Self::Busy { request } | Self::Streaming { request } => Some(request.operation),
1114            Self::Idle { .. }
1115            | Self::Stopping { .. }
1116            | Self::Killing { .. }
1117            | Self::Reaping { .. }
1118            | Self::Crashed { .. }
1119            | Self::RestartExhausted { .. }
1120            | Self::Exited { .. } => None,
1121        }
1122    }
1123
1124    fn current_request_id(&self) -> Option<WorkerRequestId> {
1125        match self {
1126            Self::Busy { request } | Self::Streaming { request } => Some(request.id),
1127            Self::Idle { .. }
1128            | Self::Stopping { .. }
1129            | Self::Killing { .. }
1130            | Self::Reaping { .. }
1131            | Self::Crashed { .. }
1132            | Self::RestartExhausted { .. }
1133            | Self::Exited { .. } => None,
1134        }
1135    }
1136}
1137
1138/// Supervisor for one `lean-rs-worker` child process.
1139///
1140/// Dropping a live supervisor starts the same bounded shutdown path as
1141/// explicit shutdown, but cannot report kill or wait failures. Call
1142/// [`LeanWorker::shutdown`] when callers need structured exit status.
1143#[derive(Debug)]
1144pub struct LeanWorker {
1145    config: LeanWorkerConfig,
1146    child: Option<Child>,
1147    stdin: Option<BufWriter<ChildStdin>>,
1148    stdout: Option<BufReader<ChildStdout>>,
1149    stderr: Option<ChildStderr>,
1150    last_exit: Option<LeanWorkerExit>,
1151    runtime_metadata: LeanWorkerRuntimeMetadata,
1152    stats: LeanWorkerStats,
1153    requests_since_restart: u64,
1154    imports_since_restart: u64,
1155    last_activity: Instant,
1156    generation: WorkerGeneration,
1157    next_request_id: WorkerRequestId,
1158    restart_window: VecDeque<Instant>,
1159    state: WorkerSupervisorState,
1160}
1161
1162#[allow(
1163    clippy::wildcard_enum_match_arm,
1164    reason = "Response and Message are #[non_exhaustive] across the lean-rs-worker-protocol crate boundary; every wildcard arm here uniformly converts an unexpected variant into a protocol-level error rather than enumerating each known variant"
1165)]
1166impl LeanWorker {
1167    /// Spawn a worker child and wait for its protocol handshake.
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns `LeanWorkerError` if the child cannot be spawned, child setup
1172    /// fails, the child exits before handshaking, or the startup timeout
1173    /// expires.
1174    pub fn spawn(config: &LeanWorkerConfig) -> Result<Self, LeanWorkerError> {
1175        let spawn_started = Instant::now();
1176        let mut command = Command::new(&config.executable);
1177        command
1178            .stdin(Stdio::piped())
1179            .stdout(Stdio::piped())
1180            .stderr(Stdio::piped())
1181            .env("LEAN_ABORT_ON_PANIC", "1")
1182            .env("LEAN_BACKTRACE", "0")
1183            .env("RUST_BACKTRACE", "0");
1184
1185        if let Some(current_dir) = &config.current_dir {
1186            command.current_dir(current_dir);
1187        }
1188        for (key, value) in &config.env {
1189            command.env(key, value);
1190        }
1191
1192        let mut child = command.spawn().map_err(|source| LeanWorkerError::Spawn {
1193            executable: config.executable.clone(),
1194            source,
1195        })?;
1196
1197        let mut stdin = child
1198            .stdin
1199            .take()
1200            .map(BufWriter::new)
1201            .ok_or_else(|| LeanWorkerError::Setup {
1202                message: "child stdin unavailable".to_owned(),
1203            })?;
1204        let stdout = child.stdout.take().ok_or_else(|| LeanWorkerError::Setup {
1205            message: "child stdout unavailable".to_owned(),
1206        })?;
1207        let stderr = child.stderr.take();
1208
1209        let max_frame_bytes = config.max_frame_bytes;
1210        let (sender, receiver) = mpsc::channel();
1211        let _handshake_reader = thread::spawn(move || {
1212            let mut stdout = BufReader::new(stdout);
1213            let result = expect_handshake(&mut stdout, max_frame_bytes);
1214            drop(sender.send((stdout, result)));
1215        });
1216
1217        let (stdout, runtime_metadata) = match receiver.recv_timeout(config.startup_timeout) {
1218            Ok((stdout, Ok(metadata))) => (stdout, metadata),
1219            Ok((_stdout, Err(_handshake_err))) => {
1220                // The handshake-thread observed a protocol-level error reading
1221                // the child's first frame. In practice this means the child is
1222                // mid-`abort()` and hasn't quite died yet—using `try_wait`
1223                // (non-blocking) here loses the race and drops the child's
1224                // stderr. Kill if still alive, then go through the canonical
1225                // post-mortem path so `LeanWorkerExit.diagnostics` carries the
1226                // bootstrap stderr with the same diagnostic contract as a runtime crash.
1227                drop(child.kill());
1228                let exit = wait_with_stderr(&mut child, stderr)?;
1229                return Err(if exit.success {
1230                    LeanWorkerError::ChildExited { exit }
1231                } else {
1232                    LeanWorkerError::ChildPanicOrAbort { exit }
1233                });
1234            }
1235            Err(mpsc::RecvTimeoutError::Timeout) => {
1236                drop(child.kill());
1237                let _exit = wait_with_stderr(&mut child, stderr)?;
1238                return Err(LeanWorkerError::Timeout {
1239                    operation: "startup",
1240                    duration: config.startup_timeout,
1241                    resource: Box::new(resource_facts(
1242                        "worker_timeout",
1243                        false,
1244                        Some("startup"),
1245                        None,
1246                        None,
1247                        None,
1248                        None,
1249                        None,
1250                        None,
1251                        Some(config.startup_timeout),
1252                        None,
1253                        None,
1254                        None,
1255                        None,
1256                        None,
1257                        None,
1258                    )),
1259                });
1260            }
1261            Err(mpsc::RecvTimeoutError::Disconnected) => {
1262                return Err(LeanWorkerError::Handshake {
1263                    message: "handshake reader exited without a result".to_owned(),
1264                });
1265            }
1266        };
1267
1268        // Negotiate the per-connection frame cap to the child. The child
1269        // blocks on this frame after sending its handshake; until it lands,
1270        // no Request frame can be sent.
1271        write_frame(
1272            &mut stdin,
1273            Message::ConfigureFrameLimit { max_frame_bytes },
1274            max_frame_bytes,
1275        )
1276        .map_err(|err| LeanWorkerError::Protocol {
1277            message: format!("failed to send ConfigureFrameLimit: {err}"),
1278        })?;
1279
1280        let spawn_handshake_elapsed = spawn_started.elapsed();
1281        Ok(Self {
1282            config: config.clone(),
1283            child: Some(child),
1284            stdin: Some(stdin),
1285            stdout: Some(stdout),
1286            stderr,
1287            last_exit: None,
1288            runtime_metadata,
1289            stats: LeanWorkerStats {
1290                last_spawn_handshake_elapsed: Some(spawn_handshake_elapsed),
1291                ..LeanWorkerStats::default()
1292            },
1293            requests_since_restart: 0,
1294            imports_since_restart: 0,
1295            last_activity: Instant::now(),
1296            generation: WorkerGeneration::default(),
1297            next_request_id: WorkerRequestId::default(),
1298            restart_window: VecDeque::new(),
1299            state: WorkerSupervisorState::Idle {
1300                generation: WorkerGeneration::default(),
1301            },
1302        })
1303    }
1304
1305    /// Check whether the worker responds to requests.
1306    ///
1307    /// # Errors
1308    ///
1309    /// Returns `LeanWorkerError` if the worker is dead, the protocol fails, or
1310    /// the child returns a typed worker error.
1311    pub fn health(&mut self) -> Result<(), LeanWorkerError> {
1312        self.prepare_request(false)?;
1313        self.send_request(Request::Health)?;
1314        self.record_request(false);
1315        match self.read_response("health")? {
1316            Response::HealthOk => Ok(()),
1317            other => Err(unexpected_response("health", &other)),
1318        }
1319    }
1320
1321    /// Load the in-tree fixture capability in the worker child.
1322    ///
1323    /// This is a fixture-only entry point used to exercise the supervisor path
1324    /// in tests. The supported public path is `open_session`, which returns the
1325    /// host-session adapter instead of expanding this fixture surface.
1326    ///
1327    /// # Errors
1328    ///
1329    /// Returns `LeanWorkerError` if the worker is dead, fixture loading fails,
1330    /// or protocol communication fails.
1331    pub fn load_fixture_capability(&mut self, fixture_root: impl AsRef<Path>) -> Result<(), LeanWorkerError> {
1332        let manifest_path = fixture_capability_manifest(fixture_root.as_ref())?;
1333        self.prepare_request(true)?;
1334        self.send_request(Request::LoadFixtureCapability {
1335            manifest_path: path_string(&manifest_path),
1336        })?;
1337        self.record_request(true);
1338        match self.read_response("load_fixture_capability")? {
1339            Response::CapabilityLoaded => Ok(()),
1340            other => Err(unexpected_response("load_fixture_capability", &other)),
1341        }
1342    }
1343
1344    /// Call the fixture multiplication export in the worker child.
1345    ///
1346    /// # Errors
1347    ///
1348    /// Returns `LeanWorkerError` if the worker is dead, the export fails, or
1349    /// protocol communication fails.
1350    pub fn call_fixture_mul(
1351        &mut self,
1352        fixture_root: impl AsRef<Path>,
1353        lhs: u64,
1354        rhs: u64,
1355    ) -> Result<u64, LeanWorkerError> {
1356        let manifest_path = fixture_capability_manifest(fixture_root.as_ref())?;
1357        self.prepare_request(true)?;
1358        self.send_request(Request::CallFixtureMul {
1359            manifest_path: path_string(&manifest_path),
1360            lhs,
1361            rhs,
1362        })?;
1363        self.record_request(true);
1364        match self.read_response("call_fixture_mul")? {
1365            Response::U64 { value } => Ok(value),
1366            other => Err(unexpected_response("call_fixture_mul", &other)),
1367        }
1368    }
1369
1370    /// Return the current worker lifecycle status.
1371    ///
1372    /// # Errors
1373    ///
1374    /// Returns `LeanWorkerError` if checking the process status fails.
1375    pub fn status(&mut self) -> Result<LeanWorkerStatus, LeanWorkerError> {
1376        if let Some(exit) = &self.last_exit {
1377            self.state = WorkerSupervisorState::Exited {
1378                generation: self.generation,
1379            };
1380            return Ok(LeanWorkerStatus::Exited(exit.clone()));
1381        }
1382        let Some(child) = self.child.as_mut() else {
1383            self.state = WorkerSupervisorState::Exited {
1384                generation: self.generation,
1385            };
1386            return Ok(LeanWorkerStatus::Exited(LeanWorkerExit {
1387                success: false,
1388                code: None,
1389                status: "worker is not running".to_owned(),
1390                diagnostics: String::new(),
1391            }));
1392        };
1393        match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
1394            Some(status) => {
1395                let diagnostics = self.read_stderr();
1396                let exit = LeanWorkerExit::from_status(status, diagnostics);
1397                self.last_exit = Some(exit.clone());
1398                self.child = None;
1399                self.stdin = None;
1400                self.stdout = None;
1401                self.stats.exits = self.stats.exits.saturating_add(1);
1402                self.state = WorkerSupervisorState::Exited {
1403                    generation: self.generation,
1404                };
1405                Ok(LeanWorkerStatus::Exited(exit))
1406            }
1407            None => Ok(LeanWorkerStatus::Running),
1408        }
1409    }
1410
1411    /// Return lifecycle counters for this supervisor.
1412    #[must_use]
1413    pub fn stats(&self) -> LeanWorkerStats {
1414        self.stats.clone()
1415    }
1416
1417    pub(crate) fn record_capability_open_timing(
1418        &mut self,
1419        capability_load_elapsed: Duration,
1420        session_open_import_elapsed: Duration,
1421    ) {
1422        self.stats.last_capability_load_elapsed = Some(capability_load_elapsed);
1423        self.stats.last_session_open_import_elapsed = Some(session_open_import_elapsed);
1424        if let Some(timing) = self.stats.last_replacement_timing.as_mut() {
1425            timing.capability_load = capability_load_elapsed;
1426            timing.session_open_import = session_open_import_elapsed;
1427        }
1428    }
1429
1430    pub(crate) fn record_command_timing(&mut self, first_command_after_open: bool, elapsed: Duration) {
1431        if first_command_after_open {
1432            self.stats.last_first_command_elapsed = Some(elapsed);
1433            if let Some(timing) = self.stats.last_replacement_timing.as_mut() {
1434                timing.first_command = Some(elapsed);
1435            }
1436        } else {
1437            self.stats.last_warm_command_elapsed = Some(elapsed);
1438            if let Some(timing) = self.stats.last_replacement_timing.as_mut() {
1439                timing.warm_command = Some(elapsed);
1440            }
1441        }
1442    }
1443
1444    /// Return policy-facing lifecycle facts for this supervisor.
1445    #[must_use]
1446    pub fn lifecycle_snapshot(&self) -> LeanWorkerLifecycleSnapshot {
1447        LeanWorkerLifecycleSnapshot::from_worker(&self.stats, self.last_exit.clone())
1448    }
1449
1450    /// Return protocol/runtime facts reported by the worker child.
1451    #[must_use]
1452    pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
1453        self.runtime_metadata.clone()
1454    }
1455
1456    /// Measure the current child RSS in KiB when supported by the platform.
1457    ///
1458    /// This is an observability hook for restart policy and memory-cycling
1459    /// workloads. A `None` result means the platform did not provide a usable
1460    /// sample; it is not a worker failure.
1461    pub fn rss_kib(&mut self) -> Option<u64> {
1462        match self.child_rss_kib() {
1463            Some(value) => {
1464                self.stats.last_rss_kib = Some(value);
1465                Some(value)
1466            }
1467            None => {
1468                self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
1469                None
1470            }
1471        }
1472    }
1473
1474    /// Return the timeout used for subsequent worker requests.
1475    #[must_use]
1476    pub fn request_timeout(&self) -> Duration {
1477        self.config.request_timeout
1478    }
1479
1480    /// Change the timeout for subsequent worker requests.
1481    ///
1482    /// This changes supervisor policy only. The supervisor still owns the
1483    /// deadline, child kill, replacement, and restart accounting.
1484    pub fn set_request_timeout(&mut self, timeout: Duration) {
1485        self.config.request_timeout = timeout;
1486    }
1487
1488    /// Explicitly cycle the worker process.
1489    ///
1490    /// This is the manual memory-reset operation. It terminates the current
1491    /// child, starts a replacement with the original configuration, and records
1492    /// `LeanWorkerRestartReason::Explicit`.
1493    ///
1494    /// # Errors
1495    ///
1496    /// Returns `LeanWorkerError` if the existing child cannot be waited on or
1497    /// the replacement child cannot be spawned and handshaken.
1498    pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
1499        self.restart_with_reason(LeanWorkerRestartReason::Explicit)
1500    }
1501
1502    pub(crate) fn cycle_with_restart_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
1503        self.restart_with_reason(reason)
1504    }
1505
1506    /// Restart this worker using its original configuration.
1507    ///
1508    /// This is an explicit lifecycle operation. Prompt 58 adds policy-driven
1509    /// restarts for memory cycling; this method only gives callers a direct
1510    /// reset point.
1511    ///
1512    /// # Errors
1513    ///
1514    /// Returns `LeanWorkerError` if the existing child cannot be waited on or
1515    /// the replacement child cannot be spawned and handshaken.
1516    pub fn restart(&mut self) -> Result<(), LeanWorkerError> {
1517        self.cycle()
1518    }
1519
1520    #[doc(hidden)]
1521    /// Kill the child process for supervisor tests.
1522    ///
1523    /// # Errors
1524    ///
1525    /// Returns `LeanWorkerError` if the worker is already dead or the OS kill
1526    /// request fails.
1527    pub fn __kill_for_test(&mut self) -> Result<(), LeanWorkerError> {
1528        let Some(child) = self.child.as_mut() else {
1529            return Err(self.dead_error());
1530        };
1531        child.kill().map_err(|source| LeanWorkerError::Kill { source })?;
1532        Ok(())
1533    }
1534
1535    #[doc(hidden)]
1536    /// Return the child process id for supervisor tests.
1537    #[must_use]
1538    pub fn __child_pid_for_test(&self) -> Option<u32> {
1539        self.child.as_ref().map(Child::id)
1540    }
1541
1542    /// Ask the child to terminate cleanly and wait for it.
1543    ///
1544    /// # Errors
1545    ///
1546    /// Returns `LeanWorkerError` if the worker is already dead, the protocol
1547    /// fails, or waiting for the child process fails.
1548    #[deprecated(note = "use LeanWorker::shutdown for structured shutdown status")]
1549    pub fn terminate(self) -> Result<LeanWorkerExit, LeanWorkerError> {
1550        self.shutdown().map(|report| report.exit)
1551    }
1552
1553    /// Shut down the worker child, escalating to kill after a bounded grace period.
1554    ///
1555    /// # Errors
1556    ///
1557    /// Returns `LeanWorkerError` if the terminate request cannot be written,
1558    /// kill escalation fails, or the child cannot be reaped within the bounded
1559    /// wait.
1560    pub fn shutdown(mut self) -> Result<LeanWorkerShutdownReport, LeanWorkerError> {
1561        self.shutdown_child(ShutdownIntent::Graceful)
1562    }
1563
1564    #[doc(hidden)]
1565    /// Trigger the fixture panic path.
1566    ///
1567    /// # Errors
1568    ///
1569    /// Returns `LeanWorkerError` if the worker does not exit fatally or if the
1570    /// protocol fails before the panic path runs.
1571    pub fn __trigger_lean_panic_fixture(
1572        mut self,
1573        fixture_root: impl AsRef<Path>,
1574    ) -> Result<LeanWorkerExit, LeanWorkerError> {
1575        let manifest_path = fixture_capability_manifest(fixture_root.as_ref())?;
1576        self.prepare_request(true)?;
1577        self.send_request(Request::TriggerLeanPanic {
1578            manifest_path: path_string(&manifest_path),
1579        })?;
1580        self.record_request(true);
1581        match self.read_response("trigger_lean_panic") {
1582            Ok(response) => Err(unexpected_response("trigger_lean_panic", &response)),
1583            Err(LeanWorkerError::ChildPanicOrAbort { exit }) => Ok(exit),
1584            Err(err) => Err(err),
1585        }
1586    }
1587
1588    #[doc(hidden)]
1589    /// Emit synthetic worker data rows through the private protocol for row sink tests.
1590    ///
1591    /// # Errors
1592    ///
1593    /// Returns `LeanWorkerError` if the worker is dead, the sink panics,
1594    /// cancellation is observed, or protocol communication fails.
1595    pub fn __emit_test_rows(
1596        &mut self,
1597        streams: Vec<String>,
1598        cancellation: Option<&LeanWorkerCancellationToken>,
1599        data: Option<&dyn LeanWorkerDataSink>,
1600    ) -> Result<u64, LeanWorkerError> {
1601        const OPERATION: &str = "emit_test_rows";
1602        check_cancelled(OPERATION, cancellation)?;
1603        self.prepare_request(false)?;
1604        self.send_request(Request::EmitTestRows { streams })?;
1605        self.record_request(false);
1606        match self.read_response_with_events(
1607            OPERATION,
1608            None,
1609            cancellation,
1610            data.map(LeanWorkerDataSinkTarget::Value),
1611            None,
1612        )? {
1613            Response::RowsComplete { count } => Ok(count),
1614            other => Err(unexpected_response(OPERATION, &other)),
1615        }
1616    }
1617
1618    #[doc(hidden)]
1619    /// Emit synthetic data rows and then let the fake child exit without a terminal response.
1620    ///
1621    /// # Errors
1622    ///
1623    /// Returns the terminal worker failure observed after tentative row delivery.
1624    pub fn __emit_test_rows_then_exit(
1625        &mut self,
1626        cancellation: Option<&LeanWorkerCancellationToken>,
1627        data: Option<&dyn LeanWorkerDataSink>,
1628    ) -> Result<(), LeanWorkerError> {
1629        const OPERATION: &str = "emit_test_rows_then_exit";
1630        check_cancelled(OPERATION, cancellation)?;
1631        self.prepare_request(false)?;
1632        self.send_request(Request::EmitTestRowsThenExit)?;
1633        self.record_request(false);
1634        let response = self.read_response_with_events(
1635            OPERATION,
1636            None,
1637            cancellation,
1638            data.map(LeanWorkerDataSinkTarget::Value),
1639            None,
1640        )?;
1641        Err(unexpected_response(OPERATION, &response))
1642    }
1643
1644    #[doc(hidden)]
1645    /// Emit synthetic data rows and then abort the fake child before terminal success.
1646    ///
1647    /// # Errors
1648    ///
1649    /// Returns the terminal worker failure observed after tentative row delivery.
1650    pub fn __emit_test_rows_then_panic(
1651        &mut self,
1652        cancellation: Option<&LeanWorkerCancellationToken>,
1653        data: Option<&dyn LeanWorkerDataSink>,
1654    ) -> Result<(), LeanWorkerError> {
1655        const OPERATION: &str = "emit_test_rows_then_panic";
1656        check_cancelled(OPERATION, cancellation)?;
1657        self.prepare_request(false)?;
1658        self.send_request(Request::EmitTestRowsThenPanic)?;
1659        self.record_request(false);
1660        let response = self.read_response_with_events(
1661            OPERATION,
1662            None,
1663            cancellation,
1664            data.map(LeanWorkerDataSinkTarget::Value),
1665            None,
1666        )?;
1667        Err(unexpected_response(OPERATION, &response))
1668    }
1669
1670    pub(crate) fn open_worker_session(
1671        &mut self,
1672        config: &LeanWorkerSessionConfig,
1673        cancellation: Option<&LeanWorkerCancellationToken>,
1674        progress: Option<&dyn LeanWorkerProgressSink>,
1675    ) -> Result<(), LeanWorkerError> {
1676        const OPERATION: &str = "open_worker_session";
1677        check_cancelled(OPERATION, cancellation)?;
1678        let before_restarts = self.stats.restarts;
1679        self.prepare_request(true)?;
1680        let import_started = Instant::now();
1681        let mode = match config.mode() {
1682            LeanWorkerSessionMode::Capability {
1683                package,
1684                lib_name,
1685                manifest_path,
1686            } => HostSessionMode::Capability {
1687                package: package.clone(),
1688                lib_name: lib_name.clone(),
1689                manifest_path: manifest_path.as_ref().map(|path| path_string(path)),
1690            },
1691            LeanWorkerSessionMode::ShimsOnly => HostSessionMode::ShimsOnly,
1692        };
1693        self.send_request(Request::OpenHostSession {
1694            project_root: config.project_root_string(),
1695            mode,
1696            imports: config.imports().to_vec(),
1697            import_profile: config.import_profile(),
1698        })?;
1699        self.record_request(true);
1700        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1701            Response::HostSessionOpened { import_stats } => {
1702                let session_open_import_elapsed = import_started.elapsed();
1703                self.stats.last_session_open_import_elapsed = Some(session_open_import_elapsed);
1704                if self.stats.restarts > before_restarts
1705                    && let Some(timing) = self.stats.last_replacement_timing.as_mut()
1706                {
1707                    timing.session_open_import = session_open_import_elapsed;
1708                }
1709                self.stats.last_import_stats = Some(import_stats);
1710                Ok(())
1711            }
1712            other => Err(unexpected_response(OPERATION, &other)),
1713        }
1714    }
1715
1716    #[expect(
1717        clippy::wildcard_enum_match_arm,
1718        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1719    )]
1720    pub(crate) fn worker_elaborate(
1721        &mut self,
1722        source: &str,
1723        options: &LeanWorkerElabOptions,
1724        cancellation: Option<&LeanWorkerCancellationToken>,
1725        progress: Option<&dyn LeanWorkerProgressSink>,
1726    ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
1727        self.round_trip(
1728            "worker_elaborate",
1729            Request::Elaborate {
1730                source: source.to_owned(),
1731                options: options.clone(),
1732            },
1733            false,
1734            cancellation,
1735            progress,
1736            |response, operation| match response {
1737                Response::Elaboration { outcome } => Ok(outcome),
1738                other => Err(unexpected_response(operation, &other)),
1739            },
1740        )
1741    }
1742
1743    #[expect(
1744        clippy::wildcard_enum_match_arm,
1745        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1746    )]
1747    pub(crate) fn worker_kernel_check(
1748        &mut self,
1749        source: &str,
1750        options: &LeanWorkerElabOptions,
1751        cancellation: Option<&LeanWorkerCancellationToken>,
1752        progress: Option<&dyn LeanWorkerProgressSink>,
1753    ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
1754        self.round_trip(
1755            "worker_kernel_check",
1756            Request::KernelCheck {
1757                source: source.to_owned(),
1758                options: options.clone(),
1759                progress: progress.is_some(),
1760            },
1761            false,
1762            cancellation,
1763            progress,
1764            |response, operation| match response {
1765                Response::KernelCheck { outcome } => Ok(outcome),
1766                other => Err(unexpected_response(operation, &other)),
1767            },
1768        )
1769    }
1770
1771    #[expect(
1772        clippy::wildcard_enum_match_arm,
1773        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1774    )]
1775    pub(crate) fn worker_declaration_kinds(
1776        &mut self,
1777        names: &[&str],
1778        cancellation: Option<&LeanWorkerCancellationToken>,
1779        progress: Option<&dyn LeanWorkerProgressSink>,
1780    ) -> Result<Vec<String>, LeanWorkerError> {
1781        self.round_trip(
1782            "worker_declaration_kinds",
1783            Request::DeclarationKinds {
1784                names: names.iter().map(|name| (*name).to_owned()).collect(),
1785                progress: progress.is_some(),
1786            },
1787            false,
1788            cancellation,
1789            progress,
1790            |response, operation| match response {
1791                Response::Strings { values } => Ok(values),
1792                other => Err(unexpected_response(operation, &other)),
1793            },
1794        )
1795    }
1796
1797    #[expect(
1798        clippy::wildcard_enum_match_arm,
1799        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1800    )]
1801    pub(crate) fn worker_declaration_names(
1802        &mut self,
1803        names: &[&str],
1804        cancellation: Option<&LeanWorkerCancellationToken>,
1805        progress: Option<&dyn LeanWorkerProgressSink>,
1806    ) -> Result<Vec<String>, LeanWorkerError> {
1807        self.round_trip(
1808            "worker_declaration_names",
1809            Request::DeclarationNames {
1810                names: names.iter().map(|name| (*name).to_owned()).collect(),
1811                progress: progress.is_some(),
1812            },
1813            false,
1814            cancellation,
1815            progress,
1816            |response, operation| match response {
1817                Response::Strings { values } => Ok(values),
1818                other => Err(unexpected_response(operation, &other)),
1819            },
1820        )
1821    }
1822
1823    #[expect(
1824        clippy::wildcard_enum_match_arm,
1825        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1826    )]
1827    pub(crate) fn worker_infer_type(
1828        &mut self,
1829        source: &str,
1830        options: &LeanWorkerElabOptions,
1831        cancellation: Option<&LeanWorkerCancellationToken>,
1832        progress: Option<&dyn LeanWorkerProgressSink>,
1833    ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
1834        self.round_trip(
1835            "worker_infer_type",
1836            Request::InferType {
1837                source: source.to_owned(),
1838                options: options.clone(),
1839            },
1840            false,
1841            cancellation,
1842            progress,
1843            |response, operation| match response {
1844                Response::MetaExpr { result } => Ok(result),
1845                other => Err(unexpected_response(operation, &other)),
1846            },
1847        )
1848    }
1849
1850    #[expect(
1851        clippy::wildcard_enum_match_arm,
1852        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1853    )]
1854    pub(crate) fn worker_whnf(
1855        &mut self,
1856        source: &str,
1857        options: &LeanWorkerElabOptions,
1858        cancellation: Option<&LeanWorkerCancellationToken>,
1859        progress: Option<&dyn LeanWorkerProgressSink>,
1860    ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
1861        self.round_trip(
1862            "worker_whnf",
1863            Request::Whnf {
1864                source: source.to_owned(),
1865                options: options.clone(),
1866            },
1867            false,
1868            cancellation,
1869            progress,
1870            |response, operation| match response {
1871                Response::MetaExpr { result } => Ok(result),
1872                other => Err(unexpected_response(operation, &other)),
1873            },
1874        )
1875    }
1876
1877    #[expect(
1878        clippy::wildcard_enum_match_arm,
1879        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1880    )]
1881    pub(crate) fn worker_is_def_eq(
1882        &mut self,
1883        lhs: &str,
1884        rhs: &str,
1885        transparency: LeanWorkerMetaTransparency,
1886        options: &LeanWorkerElabOptions,
1887        cancellation: Option<&LeanWorkerCancellationToken>,
1888        progress: Option<&dyn LeanWorkerProgressSink>,
1889    ) -> Result<LeanWorkerMetaResult<bool>, LeanWorkerError> {
1890        self.round_trip(
1891            "worker_is_def_eq",
1892            Request::IsDefEq {
1893                lhs: lhs.to_owned(),
1894                rhs: rhs.to_owned(),
1895                transparency,
1896                options: options.clone(),
1897            },
1898            false,
1899            cancellation,
1900            progress,
1901            |response, operation| match response {
1902                Response::MetaBool { result } => Ok(result),
1903                other => Err(unexpected_response(operation, &other)),
1904            },
1905        )
1906    }
1907
1908    #[expect(
1909        clippy::wildcard_enum_match_arm,
1910        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1911    )]
1912    pub(crate) fn worker_describe(
1913        &mut self,
1914        name: &str,
1915        cancellation: Option<&LeanWorkerCancellationToken>,
1916        progress: Option<&dyn LeanWorkerProgressSink>,
1917    ) -> Result<Option<LeanWorkerDeclarationRow>, LeanWorkerError> {
1918        self.round_trip(
1919            "worker_describe",
1920            Request::Describe { name: name.to_owned() },
1921            false,
1922            cancellation,
1923            progress,
1924            |response, operation| match response {
1925                Response::Declaration { row } => Ok(row),
1926                other => Err(unexpected_response(operation, &other)),
1927            },
1928        )
1929    }
1930
1931    #[expect(
1932        clippy::wildcard_enum_match_arm,
1933        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1934    )]
1935    pub(crate) fn worker_search_declarations(
1936        &mut self,
1937        search: &LeanWorkerDeclarationSearch,
1938        cancellation: Option<&LeanWorkerCancellationToken>,
1939        progress: Option<&dyn LeanWorkerProgressSink>,
1940    ) -> Result<LeanWorkerDeclarationSearchResult, LeanWorkerError> {
1941        self.round_trip(
1942            "worker_search_declarations",
1943            Request::SearchDeclarations { search: search.clone() },
1944            false,
1945            cancellation,
1946            progress,
1947            |response, operation| match response {
1948                Response::DeclarationSearch { result } => Ok(result),
1949                other => Err(unexpected_response(operation, &other)),
1950            },
1951        )
1952    }
1953
1954    #[expect(
1955        clippy::wildcard_enum_match_arm,
1956        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1957    )]
1958    pub(crate) fn worker_declaration_type(
1959        &mut self,
1960        name: &str,
1961        max_bytes: usize,
1962        cancellation: Option<&LeanWorkerCancellationToken>,
1963        progress: Option<&dyn LeanWorkerProgressSink>,
1964    ) -> Result<Option<LeanWorkerDeclarationType>, LeanWorkerError> {
1965        self.round_trip(
1966            "worker_declaration_type",
1967            Request::DeclarationType {
1968                name: name.to_owned(),
1969                max_bytes,
1970            },
1971            false,
1972            cancellation,
1973            progress,
1974            |response, operation| match response {
1975                Response::DeclarationType { row } => Ok(row),
1976                other => Err(unexpected_response(operation, &other)),
1977            },
1978        )
1979    }
1980
1981    #[expect(
1982        clippy::wildcard_enum_match_arm,
1983        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1984    )]
1985    pub(crate) fn worker_inspect_declaration(
1986        &mut self,
1987        request: &LeanWorkerDeclarationInspectionRequest,
1988        cancellation: Option<&LeanWorkerCancellationToken>,
1989        progress: Option<&dyn LeanWorkerProgressSink>,
1990    ) -> Result<LeanWorkerDeclarationInspectionResult, LeanWorkerError> {
1991        self.round_trip(
1992            "worker_inspect_declaration",
1993            Request::InspectDeclaration {
1994                request: request.clone(),
1995            },
1996            false,
1997            cancellation,
1998            progress,
1999            |response, operation| match response {
2000                Response::DeclarationInspection { result } => Ok(result),
2001                other => Err(unexpected_response(operation, &other)),
2002            },
2003        )
2004    }
2005
2006    #[expect(
2007        clippy::wildcard_enum_match_arm,
2008        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2009    )]
2010    pub(crate) fn worker_attempt_proof(
2011        &mut self,
2012        request: &LeanWorkerProofAttemptRequest,
2013        options: &LeanWorkerElabOptions,
2014        cancellation: Option<&LeanWorkerCancellationToken>,
2015        progress: Option<&dyn LeanWorkerProgressSink>,
2016    ) -> Result<LeanWorkerProofAttemptResult, LeanWorkerError> {
2017        self.round_trip(
2018            "worker_attempt_proof",
2019            Request::AttemptProof {
2020                request: request.clone(),
2021                options: options.clone(),
2022                progress: progress.is_some(),
2023            },
2024            false,
2025            cancellation,
2026            progress,
2027            |response, operation| match response {
2028                Response::ProofAttempt { result } => Ok(result),
2029                other => Err(unexpected_response(operation, &other)),
2030            },
2031        )
2032    }
2033
2034    #[expect(
2035        clippy::wildcard_enum_match_arm,
2036        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2037    )]
2038    pub(crate) fn worker_verify_declaration(
2039        &mut self,
2040        request: &LeanWorkerDeclarationVerificationRequest,
2041        options: &LeanWorkerElabOptions,
2042        cancellation: Option<&LeanWorkerCancellationToken>,
2043        progress: Option<&dyn LeanWorkerProgressSink>,
2044    ) -> Result<LeanWorkerDeclarationVerificationResult, LeanWorkerError> {
2045        const OPERATION: &str = "worker_verify_declaration";
2046        let outcome = self.round_trip(
2047            OPERATION,
2048            Request::VerifyDeclaration {
2049                request: request.clone(),
2050                options: options.clone(),
2051                progress: progress.is_some(),
2052            },
2053            false,
2054            cancellation,
2055            progress,
2056            |response, operation| match response {
2057                Response::DeclarationVerification { result } => Ok(result),
2058                other => Err(unexpected_response(operation, &other)),
2059            },
2060        );
2061        match outcome {
2062            Ok(result) => Ok(result),
2063            // A read-only verification must never surface a child abort as a hard
2064            // error: the worker's own (best-effort) screen could not prevent a
2065            // residual metavariable panic, so the supervisor respawns and reports
2066            // the honest degraded verdict. Verification is monotone, so relabeling
2067            // a non-result to `BudgetExceeded` never downgrades an `Accepted`.
2068            Err(err) => {
2069                self.recover_child_abort(OPERATION, err)?;
2070                Ok(LeanWorkerDeclarationVerificationResult::Ok {
2071                    verification_status: LeanWorkerDeclarationVerificationStatus::BudgetExceeded,
2072                    facts: Box::new(LeanWorkerDeclarationVerificationFacts::unavailable()),
2073                    imports: Vec::new(),
2074                })
2075            }
2076        }
2077    }
2078
2079    #[expect(
2080        clippy::wildcard_enum_match_arm,
2081        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2082    )]
2083    pub(crate) fn worker_verify_declaration_batch(
2084        &mut self,
2085        request: &LeanWorkerDeclarationVerificationBatchRequest,
2086        options: &LeanWorkerElabOptions,
2087        cancellation: Option<&LeanWorkerCancellationToken>,
2088        progress: Option<&dyn LeanWorkerProgressSink>,
2089    ) -> Result<LeanWorkerDeclarationVerificationBatchResult, LeanWorkerError> {
2090        const OPERATION: &str = "worker_verify_declaration_batch";
2091        let outcome = self.round_trip(
2092            OPERATION,
2093            Request::VerifyDeclarationBatch {
2094                request: request.clone(),
2095                options: options.clone(),
2096                progress: progress.is_some(),
2097            },
2098            false,
2099            cancellation,
2100            progress,
2101            |response, operation| match response {
2102                Response::DeclarationVerificationBatch { result } => Ok(result),
2103                other => Err(unexpected_response(operation, &other)),
2104            },
2105        );
2106        match outcome {
2107            Ok(result) => Ok(result),
2108            Err(err) => {
2109                self.recover_child_abort(OPERATION, err)?;
2110                Ok(degraded_declaration_verification_batch_result(request))
2111            }
2112        }
2113    }
2114
2115    #[allow(
2116        clippy::needless_pass_by_value,
2117        reason = "filter is cheap to clone, passed by value matches caller shape"
2118    )]
2119    pub(crate) fn worker_list_declarations_strings(
2120        &mut self,
2121        filter: LeanWorkerDeclarationFilter,
2122        cancellation: Option<&LeanWorkerCancellationToken>,
2123        progress: Option<&dyn LeanWorkerProgressSink>,
2124    ) -> Result<Vec<String>, LeanWorkerError> {
2125        const OPERATION: &str = "worker_list_declarations_strings";
2126        check_cancelled(OPERATION, cancellation)?;
2127        self.prepare_request(false)?;
2128        self.send_request(Request::ListDeclarationsStrings {
2129            filter,
2130            progress: progress.is_some(),
2131        })?;
2132        self.record_request(false);
2133        let collector = DeclarationNameCollector::default();
2134        let response = self.read_response_with_events(
2135            OPERATION,
2136            progress,
2137            cancellation,
2138            Some(LeanWorkerDataSinkTarget::Raw(&collector)),
2139            None,
2140        )?;
2141        if let Some(message) = collector.decode_error.lock().ok().and_then(|guard| guard.clone()) {
2142            return Err(LeanWorkerError::Protocol { message });
2143        }
2144        match response {
2145            Response::RowsComplete { count } => {
2146                let names = collector.into_inner();
2147                let observed = u64::try_from(names.len()).unwrap_or(u64::MAX);
2148                if observed != count {
2149                    return Err(LeanWorkerError::Protocol {
2150                        message: format!(
2151                            "worker_list_declarations_strings: parent collected {observed} rows but child reported {count}"
2152                        ),
2153                    });
2154                }
2155                Ok(names)
2156            }
2157            other => Err(unexpected_response(OPERATION, &other)),
2158        }
2159    }
2160
2161    #[expect(
2162        clippy::wildcard_enum_match_arm,
2163        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2164    )]
2165    pub(crate) fn worker_describe_bulk(
2166        &mut self,
2167        names: &[&str],
2168        cancellation: Option<&LeanWorkerCancellationToken>,
2169        progress: Option<&dyn LeanWorkerProgressSink>,
2170    ) -> Result<Vec<LeanWorkerDeclarationRow>, LeanWorkerError> {
2171        self.round_trip(
2172            "worker_describe_bulk",
2173            Request::DescribeBulk {
2174                names: names.iter().map(|name| (*name).to_owned()).collect(),
2175                progress: progress.is_some(),
2176            },
2177            false,
2178            cancellation,
2179            progress,
2180            |response, operation| match response {
2181                Response::DeclarationBulk { rows } => Ok(rows),
2182                other => Err(unexpected_response(operation, &other)),
2183            },
2184        )
2185    }
2186
2187    #[expect(
2188        clippy::wildcard_enum_match_arm,
2189        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2190    )]
2191    pub(crate) fn worker_process_module_query(
2192        &mut self,
2193        source: &str,
2194        query: LeanWorkerModuleQuery,
2195        options: &LeanWorkerElabOptions,
2196        cancellation: Option<&LeanWorkerCancellationToken>,
2197        progress: Option<&dyn LeanWorkerProgressSink>,
2198    ) -> Result<LeanWorkerModuleQueryOutcome, LeanWorkerError> {
2199        self.round_trip(
2200            "worker_process_module_query",
2201            Request::ProcessModuleQuery {
2202                source: source.to_owned(),
2203                query,
2204                options: options.clone(),
2205            },
2206            false,
2207            cancellation,
2208            progress,
2209            |response, operation| match response {
2210                Response::ProcessModuleQuery { outcome } => Ok(outcome),
2211                other => Err(unexpected_response(operation, &other)),
2212            },
2213        )
2214    }
2215
2216    #[expect(
2217        clippy::wildcard_enum_match_arm,
2218        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2219    )]
2220    pub(crate) fn worker_process_module_query_batch(
2221        &mut self,
2222        source: &str,
2223        selectors: &[LeanWorkerModuleQuerySelector],
2224        budgets: &LeanWorkerOutputBudgets,
2225        options: &LeanWorkerElabOptions,
2226        cancellation: Option<&LeanWorkerCancellationToken>,
2227        progress: Option<&dyn LeanWorkerProgressSink>,
2228    ) -> Result<LeanWorkerModuleQueryBatchOutcome, LeanWorkerError> {
2229        const OPERATION: &str = "worker_process_module_query_batch";
2230        let outcome = self.round_trip(
2231            OPERATION,
2232            Request::ProcessModuleQueryBatch {
2233                source: source.to_owned(),
2234                selectors: selectors.to_vec(),
2235                budgets: budgets.clone(),
2236                options: options.clone(),
2237            },
2238            false,
2239            cancellation,
2240            progress,
2241            |response, operation| match response {
2242                Response::ProcessModuleQueryBatch { outcome } => Ok(outcome),
2243                other => Err(unexpected_response(operation, &other)),
2244            },
2245        );
2246        match outcome {
2247            Ok(outcome) => Ok(outcome),
2248            // As with verification: a child abort during a read-only proof-state
2249            // batch becomes a per-selector degraded item, not a hard error.
2250            Err(err) => {
2251                let resource = err.resource_exhausted_facts().cloned().unwrap_or_else(|| {
2252                    worker_resource_facts(
2253                        "worker_child_abort",
2254                        true,
2255                        Some(OPERATION),
2256                        &self.stats,
2257                        self.stats.last_rss_kib,
2258                        None,
2259                        None,
2260                    )
2261                });
2262                self.recover_child_abort(OPERATION, err)?;
2263                Ok(degraded_query_batch_outcome(selectors, resource))
2264            }
2265        }
2266    }
2267
2268    #[expect(
2269        clippy::wildcard_enum_match_arm,
2270        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2271    )]
2272    pub(crate) fn worker_clear_module_snapshot_cache(
2273        &mut self,
2274        cancellation: Option<&LeanWorkerCancellationToken>,
2275        progress: Option<&dyn LeanWorkerProgressSink>,
2276    ) -> Result<LeanWorkerModuleSnapshotCacheClearResult, LeanWorkerError> {
2277        self.round_trip(
2278            "worker_clear_module_snapshot_cache",
2279            Request::ClearModuleSnapshotCache,
2280            false,
2281            cancellation,
2282            progress,
2283            |response, operation| match response {
2284                Response::ModuleSnapshotCacheCleared { result } => Ok(result),
2285                other => Err(unexpected_response(operation, &other)),
2286            },
2287        )
2288    }
2289
2290    pub(crate) fn worker_run_data_stream(
2291        &mut self,
2292        export: &str,
2293        request: &serde_json::Value,
2294        rows: &dyn LeanWorkerDataSink,
2295        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
2296        cancellation: Option<&LeanWorkerCancellationToken>,
2297        progress: Option<&dyn LeanWorkerProgressSink>,
2298    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
2299        self.worker_run_data_stream_with_sink(
2300            export,
2301            request,
2302            LeanWorkerDataSinkTarget::Value(rows),
2303            diagnostics,
2304            cancellation,
2305            progress,
2306        )
2307    }
2308
2309    pub(crate) fn worker_run_data_stream_raw(
2310        &mut self,
2311        export: &str,
2312        request: &serde_json::Value,
2313        rows: &dyn LeanWorkerRawDataSink,
2314        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
2315        cancellation: Option<&LeanWorkerCancellationToken>,
2316        progress: Option<&dyn LeanWorkerProgressSink>,
2317    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
2318        self.worker_run_data_stream_with_sink(
2319            export,
2320            request,
2321            LeanWorkerDataSinkTarget::Raw(rows),
2322            diagnostics,
2323            cancellation,
2324            progress,
2325        )
2326    }
2327
2328    fn worker_run_data_stream_with_sink(
2329        &mut self,
2330        export: &str,
2331        request: &serde_json::Value,
2332        rows: LeanWorkerDataSinkTarget<'_>,
2333        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
2334        cancellation: Option<&LeanWorkerCancellationToken>,
2335        progress: Option<&dyn LeanWorkerProgressSink>,
2336    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
2337        const OPERATION: &str = "worker_run_data_stream";
2338        check_cancelled(OPERATION, cancellation)?;
2339        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
2340            message: format!("worker data-stream request JSON encode failed: {err}"),
2341        })?;
2342        self.prepare_request(false)?;
2343        self.send_request(Request::RunDataStream {
2344            export: export.to_owned(),
2345            request_json,
2346            progress: progress.is_some(),
2347        })?;
2348        self.record_request(false);
2349        self.stats.stream_requests = self.stats.stream_requests.saturating_add(1);
2350        match self.read_response_with_events(OPERATION, progress, cancellation, Some(rows), diagnostics)? {
2351            Response::StreamComplete { summary } => Ok(summary.into()),
2352            Response::StreamExportFailed { status_byte } => {
2353                Err(LeanWorkerError::StreamExportFailed { status: status_byte })
2354            }
2355            Response::StreamCallbackFailed {
2356                status_byte,
2357                description,
2358            } => Err(LeanWorkerError::StreamCallbackFailed {
2359                status: status_byte,
2360                description,
2361            }),
2362            Response::StreamRowMalformed { message } => Err(LeanWorkerError::StreamRowMalformed { message }),
2363            other => Err(unexpected_response(OPERATION, &other)),
2364        }
2365    }
2366
2367    pub(crate) fn worker_capability_metadata(
2368        &mut self,
2369        export: &str,
2370        request: &serde_json::Value,
2371        cancellation: Option<&LeanWorkerCancellationToken>,
2372        progress: Option<&dyn LeanWorkerProgressSink>,
2373    ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
2374        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
2375            message: format!("worker capability metadata request JSON encode failed: {err}"),
2376        })?;
2377        self.round_trip(
2378            "worker_capability_metadata",
2379            Request::CapabilityMetadata {
2380                export: export.to_owned(),
2381                request_json,
2382            },
2383            false,
2384            cancellation,
2385            progress,
2386            |response, operation| match response {
2387                Response::CapabilityMetadata { metadata } => Ok(metadata),
2388                Response::CapabilityMetadataMalformed { message } => {
2389                    Err(LeanWorkerError::CapabilityMetadataMalformed { message })
2390                }
2391                other => Err(unexpected_response(operation, &other)),
2392            },
2393        )
2394    }
2395
2396    #[expect(
2397        clippy::wildcard_enum_match_arm,
2398        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2399    )]
2400    pub(crate) fn worker_capability_doctor(
2401        &mut self,
2402        export: &str,
2403        request: &serde_json::Value,
2404        cancellation: Option<&LeanWorkerCancellationToken>,
2405        progress: Option<&dyn LeanWorkerProgressSink>,
2406    ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
2407        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
2408            message: format!("worker capability doctor request JSON encode failed: {err}"),
2409        })?;
2410        self.round_trip(
2411            "worker_capability_doctor",
2412            Request::CapabilityDoctor {
2413                export: export.to_owned(),
2414                request_json,
2415            },
2416            false,
2417            cancellation,
2418            progress,
2419            |response, operation| match response {
2420                Response::CapabilityDoctor { report } => Ok(report),
2421                Response::CapabilityDoctorMalformed { message } => {
2422                    Err(LeanWorkerError::CapabilityDoctorMalformed { message })
2423                }
2424                other => Err(unexpected_response(operation, &other)),
2425            },
2426        )
2427    }
2428
2429    #[expect(
2430        clippy::wildcard_enum_match_arm,
2431        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2432    )]
2433    pub(crate) fn worker_json_command(
2434        &mut self,
2435        export: &str,
2436        request_json: String,
2437        cancellation: Option<&LeanWorkerCancellationToken>,
2438        progress: Option<&dyn LeanWorkerProgressSink>,
2439    ) -> Result<String, LeanWorkerError> {
2440        self.round_trip(
2441            "worker_json_command",
2442            Request::JsonCommand {
2443                export: export.to_owned(),
2444                request_json,
2445            },
2446            false,
2447            cancellation,
2448            progress,
2449            |response, operation| match response {
2450                Response::JsonCommand { response_json } => Ok(response_json),
2451                other => Err(unexpected_response(operation, &other)),
2452            },
2453        )
2454    }
2455
2456    fn send_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
2457        self.ensure_running()?;
2458        self.write_request(request)
2459    }
2460
2461    fn write_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
2462        let max_frame_bytes = self.config.max_frame_bytes;
2463        let Some(stdin) = self.stdin.as_mut() else {
2464            return Err(self.dead_error());
2465        };
2466        write_frame(stdin, Message::Request(request), max_frame_bytes).map_err(|err| LeanWorkerError::Protocol {
2467            message: err.to_string(),
2468        })
2469    }
2470
2471    fn begin_in_flight(&mut self, operation: &'static str) -> InFlightRequest {
2472        let request = InFlightRequest {
2473            id: self.next_request_id,
2474            operation,
2475            generation: self.generation,
2476        };
2477        self.next_request_id = self.next_request_id.next();
2478        self.state = WorkerSupervisorState::Busy {
2479            request: request.clone(),
2480        };
2481        request
2482    }
2483
2484    fn mark_current_request_streaming(&mut self) {
2485        match &self.state {
2486            WorkerSupervisorState::Busy { request } | WorkerSupervisorState::Streaming { request } => {
2487                self.state = WorkerSupervisorState::Streaming {
2488                    request: request.clone(),
2489                };
2490            }
2491            _ => {}
2492        }
2493    }
2494
2495    fn finish_in_flight(&mut self) {
2496        if matches!(
2497            self.state,
2498            WorkerSupervisorState::Busy { .. } | WorkerSupervisorState::Streaming { .. }
2499        ) {
2500            self.state = WorkerSupervisorState::Idle {
2501                generation: self.generation,
2502            };
2503        }
2504    }
2505
2506    fn prepare_request(&mut self, import_like: bool) -> Result<(), LeanWorkerError> {
2507        self.ensure_running()?;
2508        if import_like {
2509            self.stats.import_like_admission_attempts = self.stats.import_like_admission_attempts.saturating_add(1);
2510            self.stats.last_import_like_rss_before_admission_kib = self.child_rss_kib();
2511        }
2512
2513        if let Some(limit) = self.config.restart_policy.max_requests
2514            && self.requests_since_restart >= limit
2515        {
2516            self.restart_with_reason(LeanWorkerRestartReason::MaxRequests { limit })?;
2517            if import_like {
2518                self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2519            }
2520            return Ok(());
2521        }
2522
2523        if import_like
2524            && let Some(limit) = self.config.restart_policy.max_imports
2525            && self.imports_since_restart >= limit
2526        {
2527            self.restart_with_reason(LeanWorkerRestartReason::MaxImports { limit })?;
2528            self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2529            return Ok(());
2530        }
2531
2532        if let Some(limit_kib) = self.config.restart_policy.max_rss_kib {
2533            match self.child_rss_kib() {
2534                Some(current_kib) if current_kib >= limit_kib => {
2535                    self.stats.last_rss_kib = Some(current_kib);
2536                    self.restart_with_reason(LeanWorkerRestartReason::RssCeiling {
2537                        current_kib,
2538                        limit_kib,
2539                        last_import_stats: self.stats.last_import_stats.clone(),
2540                    })?;
2541                    if import_like {
2542                        self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2543                    }
2544                    return Ok(());
2545                }
2546                Some(current_kib) => {
2547                    self.stats.last_rss_kib = Some(current_kib);
2548                }
2549                None => {
2550                    self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
2551                }
2552            }
2553        }
2554
2555        if let Some(limit) = self.config.restart_policy.idle_restart_after {
2556            let idle_for = self.last_activity.elapsed();
2557            if idle_for >= limit {
2558                self.restart_with_reason(LeanWorkerRestartReason::Idle { idle_for, limit })?;
2559                if import_like {
2560                    self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2561                }
2562                return Ok(());
2563            }
2564        }
2565
2566        if import_like {
2567            self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2568        }
2569        Ok(())
2570    }
2571
2572    fn record_request(&mut self, import_like: bool) {
2573        self.stats.requests = self.stats.requests.saturating_add(1);
2574        self.requests_since_restart = self.requests_since_restart.saturating_add(1);
2575        if import_like {
2576            self.stats.imports = self.stats.imports.saturating_add(1);
2577            self.imports_since_restart = self.imports_since_restart.saturating_add(1);
2578        }
2579        self.last_activity = Instant::now();
2580    }
2581
2582    fn restart_with_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
2583        self.restart_with_reason_before_spawn(reason, || {})
2584    }
2585
2586    fn restart_with_reason_before_spawn(
2587        &mut self,
2588        reason: LeanWorkerRestartReason,
2589        before_spawn: impl FnOnce(),
2590    ) -> Result<(), LeanWorkerError> {
2591        let config = self.config.clone();
2592        let replacement_started = Instant::now();
2593        self.stats.replacement_attempts = self.stats.replacement_attempts.saturating_add(1);
2594        let stop_intent = if matches!(
2595            &reason,
2596            LeanWorkerRestartReason::Explicit
2597                | LeanWorkerRestartReason::MaxRequests { .. }
2598                | LeanWorkerRestartReason::MaxImports { .. }
2599                | LeanWorkerRestartReason::RssCeiling { .. }
2600                | LeanWorkerRestartReason::Idle { .. }
2601        ) {
2602            ShutdownIntent::Graceful
2603        } else {
2604            ShutdownIntent::KillOnly
2605        };
2606        if let Err(err) = self.shutdown_child(stop_intent) {
2607            self.stats.replacement_failures = self.stats.replacement_failures.saturating_add(1);
2608            self.stats.last_replacement_skipped_reason = Some("stop_failed".to_owned());
2609            return Err(err);
2610        }
2611        before_spawn();
2612        if let Err(err) = self.admit_restart(replacement_started) {
2613            self.stats.replacement_budget_skipped = self.stats.replacement_budget_skipped.saturating_add(1);
2614            self.stats.replacement_failures = self.stats.replacement_failures.saturating_add(1);
2615            self.stats.last_replacement_skipped_reason = Some("restart_limit_exceeded".to_owned());
2616            return Err(err);
2617        }
2618        self.stats.replacement_budget_admitted = self.stats.replacement_budget_admitted.saturating_add(1);
2619        let next_generation = self.generation.next();
2620        self.stats.record_restart(reason);
2621        self.requests_since_restart = 0;
2622        self.imports_since_restart = 0;
2623        let reason = self
2624            .stats
2625            .last_restart_reason
2626            .as_ref()
2627            .map_or_else(|| "unknown".to_owned(), |reason| reason.stable_cause().to_owned());
2628        let mut next = match Self::spawn(&config) {
2629            Ok(next) => next,
2630            Err(err) => {
2631                self.stats.replacement_failures = self.stats.replacement_failures.saturating_add(1);
2632                self.stats.last_replacement_skipped_reason = Some("spawn_failed".to_owned());
2633                return Err(err);
2634            }
2635        };
2636        let next_request_id = self.next_request_id;
2637        let spawn_handshake = next.stats.last_spawn_handshake_elapsed.unwrap_or_default();
2638        let mut stats = self.stats.clone();
2639        stats.replacement_successes = stats.replacement_successes.saturating_add(1);
2640        stats.last_replacement_skipped_reason = None;
2641        stats.last_spawn_handshake_elapsed = Some(spawn_handshake);
2642        stats.last_replacement_timing = Some(LeanWorkerReplacementTiming {
2643            spawn_handshake,
2644            capability_load: stats.last_capability_load_elapsed.unwrap_or_default(),
2645            session_open_import: Duration::ZERO,
2646            first_command: stats.last_first_command_elapsed,
2647            warm_command: stats.last_warm_command_elapsed,
2648            replacement_total: replacement_started.elapsed(),
2649            replacement_reason: reason,
2650            replacement_budget_status: "synchronous-no-overlap".to_owned(),
2651        });
2652        next.stats = stats;
2653        next.generation = next_generation;
2654        next.next_request_id = next_request_id;
2655        next.state = WorkerSupervisorState::Idle {
2656            generation: next_generation,
2657        };
2658        next.restart_window.clone_from(&self.restart_window);
2659        next.last_activity = Instant::now();
2660        *self = next;
2661        Ok(())
2662    }
2663
2664    fn admit_restart(&mut self, now: Instant) -> Result<(), LeanWorkerError> {
2665        let limit = self.config.restart_policy.restart_intensity;
2666        while self
2667            .restart_window
2668            .front()
2669            .is_some_and(|instant| now.saturating_duration_since(*instant) >= limit.window)
2670        {
2671            let _ = self.restart_window.pop_front();
2672        }
2673        let restarts = u64::try_from(self.restart_window.len()).unwrap_or(u64::MAX);
2674        if restarts >= limit.max_restarts {
2675            self.state = WorkerSupervisorState::RestartExhausted {
2676                generation: self.generation,
2677            };
2678            return Err(LeanWorkerError::RestartLimitExceeded {
2679                restarts,
2680                window: limit.window,
2681            });
2682        }
2683        self.restart_window.push_back(now);
2684        Ok(())
2685    }
2686
2687    /// Absorb a child abort raised during a read-only request: respawn a fresh
2688    /// child so subsequent requests succeed, then return `Ok(())` so the caller
2689    /// can synthesise a degraded verdict. Any non-abort error (or a respawn
2690    /// failure) propagates unchanged.
2691    fn recover_child_abort(&mut self, operation: &'static str, err: LeanWorkerError) -> Result<(), LeanWorkerError> {
2692        if matches!(err, LeanWorkerError::ChildPanicOrAbort { .. }) {
2693            self.restart_with_reason(LeanWorkerRestartReason::ChildAbort { operation })
2694        } else {
2695            Err(err)
2696        }
2697    }
2698
2699    fn hard_rss_limit_exceeded(&mut self) -> Option<(u64, u64)> {
2700        let limit_kib = self.config.rss_hard_limit_kib?;
2701        match self.child_rss_kib() {
2702            Some(current_kib) if current_kib >= limit_kib => {
2703                self.stats.last_rss_kib = Some(current_kib);
2704                Some((current_kib, limit_kib))
2705            }
2706            Some(current_kib) => {
2707                self.stats.last_rss_kib = Some(current_kib);
2708                None
2709            }
2710            None => {
2711                self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
2712                None
2713            }
2714        }
2715    }
2716
2717    fn read_response(&mut self, operation: &'static str) -> Result<Response, LeanWorkerError> {
2718        self.read_response_with_events(operation, None, None, None, None)
2719    }
2720
2721    fn read_response_with_progress(
2722        &mut self,
2723        operation: &'static str,
2724        progress: Option<&dyn LeanWorkerProgressSink>,
2725        cancellation: Option<&LeanWorkerCancellationToken>,
2726    ) -> Result<Response, LeanWorkerError> {
2727        self.read_response_with_events(operation, progress, cancellation, None, None)
2728    }
2729
2730    /// Run one Request/Response round-trip and project the response into a
2731    /// typed value.
2732    ///
2733    /// Centralizes the cancel-check → send → record → read sequence so every
2734    /// `worker_*` helper above delegates here instead of repeating five
2735    /// identical lines plus a 22-variant wildcard arm. The `extract` closure
2736    /// receives the response together with the operation name; it returns the
2737    /// typed value, the typed error variant the protocol expects (e.g.,
2738    /// `*Malformed`), or `unexpected_response(operation, &other)` for any
2739    /// wire variant the operation never expects.
2740    fn round_trip<R>(
2741        &mut self,
2742        operation: &'static str,
2743        request: Request,
2744        import_like: bool,
2745        cancellation: Option<&LeanWorkerCancellationToken>,
2746        progress: Option<&dyn LeanWorkerProgressSink>,
2747        extract: impl FnOnce(Response, &'static str) -> Result<R, LeanWorkerError>,
2748    ) -> Result<R, LeanWorkerError> {
2749        check_cancelled(operation, cancellation)?;
2750        self.prepare_request(import_like)?;
2751        self.send_request(request)?;
2752        self.record_request(import_like);
2753        let response = self.read_response_with_progress(operation, progress, cancellation)?;
2754        extract(response, operation)
2755    }
2756
2757    fn read_response_with_events(
2758        &mut self,
2759        operation: &'static str,
2760        progress: Option<&dyn LeanWorkerProgressSink>,
2761        cancellation: Option<&LeanWorkerCancellationToken>,
2762        data: Option<LeanWorkerDataSinkTarget<'_>>,
2763        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
2764    ) -> Result<Response, LeanWorkerError> {
2765        let started = Instant::now();
2766        let timeout = self.config.request_timeout;
2767        let deadline = started.checked_add(timeout);
2768        let streaming = data.is_some();
2769        let mut request_backpressure_waits = 0_u64;
2770        let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
2771        let max_frame_bytes = self.config.max_frame_bytes;
2772        let request = self.begin_in_flight(operation);
2773        let generation = request.generation;
2774        let request_id = Some(request.id);
2775        let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
2776        let reader =
2777            thread::spawn(move || read_request_messages(stdout, sender, max_frame_bytes, generation, request_id));
2778
2779        loop {
2780            if let Some((current_kib, limit_kib)) = self.hard_rss_limit_exceeded() {
2781                if streaming {
2782                    self.record_stream_failure(started, request_backpressure_waits);
2783                }
2784                let last_import_stats = self.stats.last_import_stats.clone();
2785                if let Err(err) = self.restart_with_reason_before_spawn(
2786                    LeanWorkerRestartReason::RssHardLimit {
2787                        operation,
2788                        current_kib,
2789                        limit_kib,
2790                        last_import_stats: last_import_stats.clone(),
2791                    },
2792                    || {
2793                        drop(receiver);
2794                        drop(reader.join());
2795                    },
2796                ) {
2797                    self.finish_in_flight();
2798                    return Err(err);
2799                }
2800                self.finish_in_flight();
2801                return Err(LeanWorkerError::RssHardLimitExceeded {
2802                    operation,
2803                    current_kib,
2804                    limit_kib,
2805                    last_import_stats: last_import_stats.map(Box::new),
2806                    resource: Box::new(worker_resource_facts(
2807                        "worker_rss_hard_limit",
2808                        true,
2809                        Some(operation),
2810                        &self.stats,
2811                        Some(current_kib),
2812                        Some(limit_kib),
2813                        None,
2814                    )),
2815                });
2816            }
2817            let event = match deadline.and_then(|deadline| deadline.checked_duration_since(Instant::now())) {
2818                Some(remaining) if remaining.is_zero() => {
2819                    if streaming {
2820                        self.record_stream_failure(started, request_backpressure_waits);
2821                    }
2822                    if let Err(err) = self.restart_with_reason_before_spawn(
2823                        LeanWorkerRestartReason::RequestTimeout {
2824                            operation,
2825                            duration: timeout,
2826                        },
2827                        || {
2828                            drop(receiver);
2829                            drop(reader.join());
2830                        },
2831                    ) {
2832                        self.finish_in_flight();
2833                        return Err(err);
2834                    }
2835                    self.finish_in_flight();
2836                    return Err(LeanWorkerError::Timeout {
2837                        operation,
2838                        duration: timeout,
2839                        resource: Box::new(worker_resource_facts(
2840                            "worker_timeout",
2841                            true,
2842                            Some(operation),
2843                            &self.stats,
2844                            self.stats.last_rss_kib,
2845                            None,
2846                            Some(timeout),
2847                        )),
2848                    });
2849                }
2850                Some(remaining) => {
2851                    let hard_watch_enabled = self.config.rss_hard_limit_kib.is_some();
2852                    let wait_for = if hard_watch_enabled {
2853                        remaining.min(self.config.rss_sample_interval)
2854                    } else {
2855                        remaining
2856                    };
2857                    match receiver.recv_timeout(wait_for) {
2858                        Ok(event) => event,
2859                        Err(mpsc::RecvTimeoutError::Timeout) if hard_watch_enabled && wait_for < remaining => {
2860                            continue;
2861                        }
2862                        Err(mpsc::RecvTimeoutError::Timeout) => {
2863                            if streaming {
2864                                self.record_stream_failure(started, request_backpressure_waits);
2865                            }
2866                            if let Err(err) = self.restart_with_reason_before_spawn(
2867                                LeanWorkerRestartReason::RequestTimeout {
2868                                    operation,
2869                                    duration: timeout,
2870                                },
2871                                || {
2872                                    drop(receiver);
2873                                    drop(reader.join());
2874                                },
2875                            ) {
2876                                self.finish_in_flight();
2877                                return Err(err);
2878                            }
2879                            self.finish_in_flight();
2880                            return Err(LeanWorkerError::Timeout {
2881                                operation,
2882                                duration: timeout,
2883                                resource: Box::new(worker_resource_facts(
2884                                    "worker_timeout",
2885                                    true,
2886                                    Some(operation),
2887                                    &self.stats,
2888                                    self.stats.last_rss_kib,
2889                                    None,
2890                                    Some(timeout),
2891                                )),
2892                            });
2893                        }
2894                        Err(mpsc::RecvTimeoutError::Disconnected) => {
2895                            self.finish_in_flight();
2896                            drop(reader.join());
2897                            return Err(LeanWorkerError::Protocol {
2898                                message: "worker response reader exited without a terminal response".to_owned(),
2899                            });
2900                        }
2901                    }
2902                }
2903                None => match receiver.recv() {
2904                    Ok(event) => event,
2905                    Err(_err) => {
2906                        self.finish_in_flight();
2907                        drop(reader.join());
2908                        return Err(LeanWorkerError::Protocol {
2909                            message: "worker response reader exited without a terminal response".to_owned(),
2910                        });
2911                    }
2912                },
2913            };
2914            if event.generation() != self.generation {
2915                let actual_generation = event.generation();
2916                self.finish_in_flight();
2917                drop(reader.join());
2918                return Err(stale_worker_output_error(operation, self.generation, actual_generation));
2919            }
2920            if let Some(actual_request_id) = event.request_id()
2921                && self.state.current_request_id() != Some(actual_request_id)
2922            {
2923                self.finish_in_flight();
2924                drop(reader.join());
2925                return Err(stale_worker_request_output_error(
2926                    operation,
2927                    self.state.current_request_id(),
2928                    actual_request_id,
2929                ));
2930            }
2931            request_backpressure_waits = request_backpressure_waits.saturating_add(event.backpressure_waits());
2932            self.stats.backpressure_waits = self.stats.backpressure_waits.saturating_add(event.backpressure_waits());
2933
2934            let message = match event {
2935                RequestReaderEvent::Message { message, .. } => message,
2936                RequestReaderEvent::Terminal { message, stdout, .. } => {
2937                    self.stdout = Some(stdout);
2938                    match message {
2939                        Message::Response(Response::Error { code, message }) => {
2940                            self.terminalize_request_failure(streaming, started, request_backpressure_waits);
2941                            drop(reader.join());
2942                            return Err(LeanWorkerError::Worker { code, message });
2943                        }
2944                        Message::Response(response) => {
2945                            let response = self.terminalize_request_response(
2946                                response,
2947                                streaming,
2948                                started,
2949                                request_backpressure_waits,
2950                            );
2951                            drop(reader.join());
2952                            return Ok(response);
2953                        }
2954                        other => {
2955                            self.terminalize_request_failure(streaming, started, request_backpressure_waits);
2956                            drop(reader.join());
2957                            return Err(LeanWorkerError::Protocol {
2958                                message: format!("worker sent unexpected {operation} message: {other:?}"),
2959                            });
2960                        }
2961                    }
2962                }
2963                RequestReaderEvent::ReadError { message, eof, .. } => {
2964                    drop(reader.join());
2965                    self.terminalize_request_failure(streaming, started, request_backpressure_waits);
2966                    return if eof {
2967                        Err(self.record_exit_error())
2968                    } else {
2969                        Err(LeanWorkerError::Protocol { message })
2970                    };
2971                }
2972            };
2973
2974            match message {
2975                Message::ProgressTick(tick) => {
2976                    self.mark_current_request_streaming();
2977                    if let Err(err) =
2978                        report_parent_progress(progress, elapsed_event(tick.phase, tick.current, tick.total, started))
2979                    {
2980                        self.terminalize_request_failure(streaming, started, request_backpressure_waits);
2981                        return Err(err);
2982                    }
2983                    if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
2984                        if streaming {
2985                            self.record_stream_failure(started, request_backpressure_waits);
2986                        }
2987                        if let Err(err) = self.restart_with_reason_before_spawn(
2988                            LeanWorkerRestartReason::Cancelled { operation },
2989                            || {
2990                                drop(receiver);
2991                                drop(reader.join());
2992                            },
2993                        ) {
2994                            self.finish_in_flight();
2995                            return Err(err);
2996                        }
2997                        self.finish_in_flight();
2998                        return Err(LeanWorkerError::Cancelled {
2999                            operation,
3000                            resource: Box::new(worker_resource_facts(
3001                                "worker_cancelled",
3002                                true,
3003                                Some(operation),
3004                                &self.stats,
3005                                self.stats.last_rss_kib,
3006                                None,
3007                                None,
3008                            )),
3009                        });
3010                    }
3011                }
3012                Message::DataRow(row) => {
3013                    self.mark_current_request_streaming();
3014                    let payload_bytes = row.payload.get().len() as u64;
3015                    if let Err(err) = report_parent_data_row(data, row) {
3016                        self.terminalize_request_failure(streaming, started, request_backpressure_waits);
3017                        return Err(err);
3018                    }
3019                    self.stats.data_rows_delivered = self.stats.data_rows_delivered.saturating_add(1);
3020                    self.stats.data_row_payload_bytes = self.stats.data_row_payload_bytes.saturating_add(payload_bytes);
3021                    if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
3022                        if streaming {
3023                            self.record_stream_failure(started, request_backpressure_waits);
3024                        }
3025                        if let Err(err) = self.restart_with_reason_before_spawn(
3026                            LeanWorkerRestartReason::Cancelled { operation },
3027                            || {
3028                                drop(receiver);
3029                                drop(reader.join());
3030                            },
3031                        ) {
3032                            self.finish_in_flight();
3033                            return Err(err);
3034                        }
3035                        self.finish_in_flight();
3036                        return Err(LeanWorkerError::Cancelled {
3037                            operation,
3038                            resource: Box::new(worker_resource_facts(
3039                                "worker_cancelled",
3040                                true,
3041                                Some(operation),
3042                                &self.stats,
3043                                self.stats.last_rss_kib,
3044                                None,
3045                                None,
3046                            )),
3047                        });
3048                    }
3049                }
3050                Message::Diagnostic(diagnostic) => {
3051                    self.mark_current_request_streaming();
3052                    if let Err(err) = report_parent_diagnostic(diagnostics, diagnostic.into()) {
3053                        self.terminalize_request_failure(streaming, started, request_backpressure_waits);
3054                        return Err(err);
3055                    }
3056                }
3057                Message::Response(response) => {
3058                    self.terminalize_request_failure(streaming, started, request_backpressure_waits);
3059                    return Err(unexpected_response(operation, &response));
3060                }
3061                other => {
3062                    self.terminalize_request_failure(streaming, started, request_backpressure_waits);
3063                    return Err(LeanWorkerError::Protocol {
3064                        message: format!("worker sent unexpected {operation} message: {other:?}"),
3065                    });
3066                }
3067            }
3068        }
3069    }
3070
3071    fn ensure_running(&mut self) -> Result<(), LeanWorkerError> {
3072        if self.state.rejects_new_requests() {
3073            return Err(LeanWorkerError::ShutdownInProgress {
3074                operation: self.state.current_operation().unwrap_or("worker_request"),
3075            });
3076        }
3077        match self.status()? {
3078            LeanWorkerStatus::Running => Ok(()),
3079            LeanWorkerStatus::Exited(exit) if exit.success => Err(LeanWorkerError::ChildExited { exit }),
3080            LeanWorkerStatus::Exited(exit) => Err(LeanWorkerError::ChildPanicOrAbort { exit }),
3081        }
3082    }
3083
3084    fn terminalize_request_response(
3085        &mut self,
3086        response: Response,
3087        streaming: bool,
3088        started: Instant,
3089        backpressure_waits: u64,
3090    ) -> Response {
3091        if streaming {
3092            if matches!(
3093                response,
3094                Response::StreamComplete { .. } | Response::RowsComplete { .. }
3095            ) {
3096                self.record_stream_success(started);
3097            } else {
3098                self.record_stream_failure(started, backpressure_waits);
3099            }
3100        }
3101        self.finish_in_flight();
3102        response
3103    }
3104
3105    fn terminalize_request_failure(&mut self, streaming: bool, started: Instant, backpressure_waits: u64) {
3106        if streaming {
3107            self.record_stream_failure(started, backpressure_waits);
3108        }
3109        self.finish_in_flight();
3110    }
3111
3112    fn record_stream_success(&mut self, started: Instant) {
3113        self.stats.stream_successes = self.stats.stream_successes.saturating_add(1);
3114        self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
3115    }
3116
3117    fn record_stream_failure(&mut self, started: Instant, backpressure_waits: u64) {
3118        self.stats.stream_failures = self.stats.stream_failures.saturating_add(1);
3119        self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
3120        if backpressure_waits > 0 {
3121            self.stats.backpressure_failures = self.stats.backpressure_failures.saturating_add(1);
3122        }
3123    }
3124
3125    fn wait_for_exit(&mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
3126        let Some(child) = self.child.as_mut() else {
3127            return Err(self.dead_error());
3128        };
3129        self.state = WorkerSupervisorState::Reaping {
3130            generation: self.generation,
3131        };
3132        let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
3133        Ok(self.finalize_child_exit(status))
3134    }
3135
3136    fn wait_for_exit_bounded(
3137        &mut self,
3138        operation: &'static str,
3139        timeout: Duration,
3140    ) -> Result<(LeanWorkerExit, Duration), LeanWorkerError> {
3141        let started = Instant::now();
3142        loop {
3143            let Some(child) = self.child.as_mut() else {
3144                return Err(self.dead_error());
3145            };
3146            self.state = WorkerSupervisorState::Reaping {
3147                generation: self.generation,
3148            };
3149            match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
3150                Some(status) => return Ok((self.finalize_child_exit(status), started.elapsed())),
3151                None if started.elapsed() >= timeout => {
3152                    return Err(LeanWorkerError::WaitTimeout {
3153                        operation,
3154                        duration: timeout,
3155                    });
3156                }
3157                None => thread::sleep(Duration::from_millis(10).min(timeout.saturating_sub(started.elapsed()))),
3158            }
3159        }
3160    }
3161
3162    fn finalize_child_exit(&mut self, status: ExitStatus) -> LeanWorkerExit {
3163        let diagnostics = self.read_stderr();
3164        let exit = LeanWorkerExit::from_status(status, diagnostics);
3165        self.last_exit = Some(exit.clone());
3166        self.child = None;
3167        self.stdin = None;
3168        self.stdout = None;
3169        self.finish_in_flight();
3170        self.state = WorkerSupervisorState::Exited {
3171            generation: self.generation,
3172        };
3173        self.stats.exits = self.stats.exits.saturating_add(1);
3174        exit
3175    }
3176
3177    fn record_exit_error(&mut self) -> LeanWorkerError {
3178        self.state = WorkerSupervisorState::Crashed {
3179            generation: self.generation,
3180        };
3181        match self.wait_for_exit() {
3182            Ok(exit) if exit.success => LeanWorkerError::ChildExited { exit },
3183            Ok(exit) => LeanWorkerError::ChildPanicOrAbort { exit },
3184            Err(err) => err,
3185        }
3186    }
3187
3188    fn shutdown_child(&mut self, intent: ShutdownIntent) -> Result<LeanWorkerShutdownReport, LeanWorkerError> {
3189        let started = Instant::now();
3190        let graceful_timeout = self.config.shutdown_timeout;
3191        match self.status()? {
3192            LeanWorkerStatus::Exited(exit) => {
3193                return Ok(LeanWorkerShutdownReport {
3194                    outcome: LeanWorkerShutdownOutcome::AlreadyExited,
3195                    exit,
3196                    graceful_timeout,
3197                    elapsed: started.elapsed(),
3198                    kill_elapsed: None,
3199                    wait_elapsed: Duration::ZERO,
3200                });
3201            }
3202            LeanWorkerStatus::Running => {}
3203        }
3204
3205        self.state = if intent == ShutdownIntent::Graceful {
3206            WorkerSupervisorState::Stopping {
3207                generation: self.generation,
3208            }
3209        } else {
3210            WorkerSupervisorState::Killing {
3211                generation: self.generation,
3212            }
3213        };
3214        self.finish_in_flight();
3215
3216        if intent == ShutdownIntent::Graceful {
3217            match self.write_request(Request::Terminate) {
3218                Ok(()) => return self.wait_for_graceful_shutdown(started, graceful_timeout),
3219                Err(LeanWorkerError::Protocol { .. } | LeanWorkerError::Worker { .. }) => {
3220                    return self.kill_and_report(
3221                        started,
3222                        graceful_timeout,
3223                        LeanWorkerShutdownOutcome::GracefulProtocolFailedKilled,
3224                    );
3225                }
3226                Err(err) => return Err(err),
3227            }
3228        }
3229
3230        self.kill_and_report(started, graceful_timeout, LeanWorkerShutdownOutcome::KillOnly)
3231    }
3232
3233    fn wait_for_graceful_shutdown(
3234        &mut self,
3235        started: Instant,
3236        graceful_timeout: Duration,
3237    ) -> Result<LeanWorkerShutdownReport, LeanWorkerError> {
3238        let grace_started = Instant::now();
3239        let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
3240        let max_frame_bytes = self.config.max_frame_bytes;
3241        let generation = self.generation;
3242        let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
3243        let reader = thread::spawn(move || read_request_messages(stdout, sender, max_frame_bytes, generation, None));
3244
3245        loop {
3246            if let Some(child) = self.child.as_mut()
3247                && let Some(status) = child.try_wait().map_err(|source| LeanWorkerError::Wait { source })?
3248            {
3249                drop(receiver);
3250                drop(reader.join());
3251                let wait_elapsed = grace_started.elapsed();
3252                let exit = self.finalize_child_exit(status);
3253                return Ok(LeanWorkerShutdownReport {
3254                    outcome: LeanWorkerShutdownOutcome::Graceful,
3255                    exit,
3256                    graceful_timeout,
3257                    elapsed: started.elapsed(),
3258                    kill_elapsed: None,
3259                    wait_elapsed,
3260                });
3261            }
3262
3263            let elapsed = grace_started.elapsed();
3264            if elapsed >= graceful_timeout {
3265                let kill_started = Instant::now();
3266                if let Some(child) = self.child.as_mut() {
3267                    child.kill().map_err(|source| LeanWorkerError::Kill { source })?;
3268                }
3269                drop(receiver);
3270                drop(reader.join());
3271                let (exit, wait_elapsed) =
3272                    self.wait_for_exit_bounded("kill_wait", LEAN_WORKER_KILL_WAIT_TIMEOUT_DEFAULT)?;
3273                return Ok(LeanWorkerShutdownReport {
3274                    outcome: LeanWorkerShutdownOutcome::GracefulTimedOutKilled,
3275                    exit,
3276                    graceful_timeout,
3277                    elapsed: started.elapsed(),
3278                    kill_elapsed: Some(kill_started.elapsed()),
3279                    wait_elapsed,
3280                });
3281            }
3282
3283            let receive_timeout = Duration::from_millis(10).min(graceful_timeout.saturating_sub(elapsed));
3284            match receiver.recv_timeout(receive_timeout) {
3285                Ok(RequestReaderEvent::Terminal {
3286                    message: Message::Response(Response::Terminating),
3287                    stdout,
3288                    ..
3289                }) => {
3290                    drop(reader.join());
3291                    self.stdout = Some(stdout);
3292                    let remaining = graceful_timeout.saturating_sub(grace_started.elapsed());
3293                    match self.wait_for_exit_bounded("shutdown", remaining) {
3294                        Ok((exit, wait_elapsed)) => {
3295                            return Ok(LeanWorkerShutdownReport {
3296                                outcome: LeanWorkerShutdownOutcome::Graceful,
3297                                exit,
3298                                graceful_timeout,
3299                                elapsed: started.elapsed(),
3300                                kill_elapsed: None,
3301                                wait_elapsed,
3302                            });
3303                        }
3304                        Err(LeanWorkerError::WaitTimeout { .. }) => {
3305                            return self.kill_and_report(
3306                                started,
3307                                graceful_timeout,
3308                                LeanWorkerShutdownOutcome::GracefulTimedOutKilled,
3309                            );
3310                        }
3311                        Err(err) => return Err(err),
3312                    }
3313                }
3314                Ok(RequestReaderEvent::Terminal { stdout, .. }) => {
3315                    drop(reader.join());
3316                    self.stdout = Some(stdout);
3317                    return self.kill_and_report(
3318                        started,
3319                        graceful_timeout,
3320                        LeanWorkerShutdownOutcome::GracefulProtocolFailedKilled,
3321                    );
3322                }
3323                Ok(RequestReaderEvent::ReadError { eof: true, .. }) => {
3324                    drop(reader.join());
3325                    let remaining = graceful_timeout.saturating_sub(grace_started.elapsed());
3326                    match self.wait_for_exit_bounded("shutdown", remaining) {
3327                        Ok((exit, wait_elapsed)) => {
3328                            return Ok(LeanWorkerShutdownReport {
3329                                outcome: LeanWorkerShutdownOutcome::Graceful,
3330                                exit,
3331                                graceful_timeout,
3332                                elapsed: started.elapsed(),
3333                                kill_elapsed: None,
3334                                wait_elapsed,
3335                            });
3336                        }
3337                        Err(LeanWorkerError::WaitTimeout { .. }) => {
3338                            return self.kill_and_report(
3339                                started,
3340                                graceful_timeout,
3341                                LeanWorkerShutdownOutcome::GracefulTimedOutKilled,
3342                            );
3343                        }
3344                        Err(err) => return Err(err),
3345                    }
3346                }
3347                Ok(RequestReaderEvent::ReadError { .. }) | Err(mpsc::RecvTimeoutError::Disconnected) => {
3348                    drop(reader.join());
3349                    return self.kill_and_report(
3350                        started,
3351                        graceful_timeout,
3352                        LeanWorkerShutdownOutcome::GracefulProtocolFailedKilled,
3353                    );
3354                }
3355                Ok(RequestReaderEvent::Message { .. }) | Err(mpsc::RecvTimeoutError::Timeout) => {}
3356            }
3357        }
3358    }
3359
3360    fn kill_and_report(
3361        &mut self,
3362        started: Instant,
3363        graceful_timeout: Duration,
3364        outcome: LeanWorkerShutdownOutcome,
3365    ) -> Result<LeanWorkerShutdownReport, LeanWorkerError> {
3366        let kill_started = Instant::now();
3367        self.state = WorkerSupervisorState::Killing {
3368            generation: self.generation,
3369        };
3370        if let Some(child) = self.child.as_mut() {
3371            child.kill().map_err(|source| LeanWorkerError::Kill { source })?;
3372        }
3373        let (exit, wait_elapsed) = self.wait_for_exit_bounded("kill_wait", LEAN_WORKER_KILL_WAIT_TIMEOUT_DEFAULT)?;
3374        Ok(LeanWorkerShutdownReport {
3375            outcome,
3376            exit,
3377            graceful_timeout,
3378            elapsed: started.elapsed(),
3379            kill_elapsed: Some(kill_started.elapsed()),
3380            wait_elapsed,
3381        })
3382    }
3383
3384    fn dead_error(&self) -> LeanWorkerError {
3385        let exit = self.last_exit.clone().unwrap_or_else(|| LeanWorkerExit {
3386            success: false,
3387            code: None,
3388            status: "worker is not running".to_owned(),
3389            diagnostics: String::new(),
3390        });
3391        if exit.success {
3392            LeanWorkerError::ChildExited { exit }
3393        } else {
3394            LeanWorkerError::ChildPanicOrAbort { exit }
3395        }
3396    }
3397
3398    fn read_stderr(&mut self) -> String {
3399        let mut diagnostics = String::new();
3400        if let Some(mut pipe) = self.stderr.take() {
3401            drop(pipe.read_to_string(&mut diagnostics));
3402        }
3403        diagnostics
3404    }
3405
3406    fn child_rss_kib(&mut self) -> Option<u64> {
3407        let child = self.child.as_mut()?;
3408        child_rss_kib(child.id())
3409    }
3410}
3411
3412enum RequestReaderEvent {
3413    Message {
3414        generation: WorkerGeneration,
3415        request_id: Option<WorkerRequestId>,
3416        message: Message,
3417        backpressure_waits: u64,
3418    },
3419    Terminal {
3420        generation: WorkerGeneration,
3421        request_id: Option<WorkerRequestId>,
3422        message: Message,
3423        stdout: BufReader<ChildStdout>,
3424        backpressure_waits: u64,
3425    },
3426    ReadError {
3427        generation: WorkerGeneration,
3428        request_id: Option<WorkerRequestId>,
3429        message: String,
3430        eof: bool,
3431        backpressure_waits: u64,
3432    },
3433}
3434
3435#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3436enum ShutdownIntent {
3437    Graceful,
3438    KillOnly,
3439}
3440
3441impl RequestReaderEvent {
3442    fn generation(&self) -> WorkerGeneration {
3443        match self {
3444            Self::Message { generation, .. }
3445            | Self::Terminal { generation, .. }
3446            | Self::ReadError { generation, .. } => *generation,
3447        }
3448    }
3449
3450    fn backpressure_waits(&self) -> u64 {
3451        match self {
3452            Self::Message { backpressure_waits, .. }
3453            | Self::Terminal { backpressure_waits, .. }
3454            | Self::ReadError { backpressure_waits, .. } => *backpressure_waits,
3455        }
3456    }
3457
3458    fn request_id(&self) -> Option<WorkerRequestId> {
3459        match self {
3460            Self::Message { request_id, .. }
3461            | Self::Terminal { request_id, .. }
3462            | Self::ReadError { request_id, .. } => *request_id,
3463        }
3464    }
3465
3466    fn add_backpressure_wait(&mut self) {
3467        match self {
3468            Self::Message { backpressure_waits, .. }
3469            | Self::Terminal { backpressure_waits, .. }
3470            | Self::ReadError { backpressure_waits, .. } => {
3471                *backpressure_waits = backpressure_waits.saturating_add(1);
3472            }
3473        }
3474    }
3475}
3476
3477#[allow(
3478    clippy::needless_pass_by_value,
3479    reason = "the request reader thread must own the sender"
3480)]
3481fn read_request_messages(
3482    mut stdout: BufReader<ChildStdout>,
3483    sender: mpsc::SyncSender<RequestReaderEvent>,
3484    max_frame_bytes: u32,
3485    generation: WorkerGeneration,
3486    request_id: Option<WorkerRequestId>,
3487) {
3488    loop {
3489        match read_frame(&mut stdout, max_frame_bytes) {
3490            Ok(frame) if matches!(frame.message, Message::Response(_)) => {
3491                let _ = send_reader_event(
3492                    &sender,
3493                    RequestReaderEvent::Terminal {
3494                        generation,
3495                        request_id,
3496                        message: frame.message,
3497                        stdout,
3498                        backpressure_waits: 0,
3499                    },
3500                );
3501                return;
3502            }
3503            Ok(frame) => {
3504                if send_reader_event(
3505                    &sender,
3506                    RequestReaderEvent::Message {
3507                        generation,
3508                        request_id,
3509                        message: frame.message,
3510                        backpressure_waits: 0,
3511                    },
3512                )
3513                .is_err()
3514                {
3515                    return;
3516                }
3517            }
3518            Err(err) => {
3519                let _ = send_reader_event(
3520                    &sender,
3521                    RequestReaderEvent::ReadError {
3522                        generation,
3523                        request_id,
3524                        message: err.to_string(),
3525                        eof: err.is_eof(),
3526                        backpressure_waits: 0,
3527                    },
3528                );
3529                return;
3530            }
3531        }
3532    }
3533}
3534
3535fn send_reader_event(sender: &mpsc::SyncSender<RequestReaderEvent>, event: RequestReaderEvent) -> Result<(), ()> {
3536    match sender.try_send(event) {
3537        Ok(()) => Ok(()),
3538        Err(mpsc::TrySendError::Full(mut event)) => {
3539            event.add_backpressure_wait();
3540            sender.send(event).map_err(|_| ())
3541        }
3542        Err(mpsc::TrySendError::Disconnected(_event)) => Err(()),
3543    }
3544}
3545
3546impl Drop for LeanWorker {
3547    fn drop(&mut self) {
3548        drop(self.shutdown_child(ShutdownIntent::Graceful));
3549    }
3550}
3551
3552#[allow(
3553    clippy::wildcard_enum_match_arm,
3554    reason = "Message is #[non_exhaustive] across the lean-rs-worker-protocol crate boundary; the wildcard arm uniformly rejects any non-handshake frame"
3555)]
3556fn expect_handshake(
3557    stdout: &mut BufReader<ChildStdout>,
3558    max_frame_bytes: u32,
3559) -> Result<LeanWorkerRuntimeMetadata, LeanWorkerError> {
3560    let frame = read_frame(stdout, max_frame_bytes).map_err(|err| {
3561        if err.is_eof() {
3562            LeanWorkerError::Handshake {
3563                message: "child closed stdout before handshake".to_owned(),
3564            }
3565        } else {
3566            LeanWorkerError::Handshake {
3567                message: err.to_string(),
3568            }
3569        }
3570    })?;
3571    match frame.message {
3572        Message::Handshake {
3573            worker_version,
3574            protocol_version,
3575        } if protocol_version == lean_rs_worker_protocol::protocol::PROTOCOL_VERSION => Ok(LeanWorkerRuntimeMetadata {
3576            worker_version,
3577            protocol_version,
3578            lean_version: None,
3579        }),
3580        other => Err(LeanWorkerError::Handshake {
3581            message: format!("unexpected handshake frame: {other:?}"),
3582        }),
3583    }
3584}
3585
3586fn wait_with_stderr(child: &mut Child, stderr: Option<ChildStderr>) -> Result<LeanWorkerExit, LeanWorkerError> {
3587    let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
3588    let mut diagnostics = String::new();
3589    if let Some(mut pipe) = stderr {
3590        drop(pipe.read_to_string(&mut diagnostics));
3591    }
3592    Ok(LeanWorkerExit::from_status(status, diagnostics))
3593}
3594
3595/// Cap so a single `Display` line cannot blow up downstream log/telemetry
3596/// pipelines that capture `err.to_string()` (`tracing::error!("{err}")`,
3597/// JSON log shippers). The full `exit.diagnostics` text is still available on
3598/// the field; this only limits what crosses the `Display` surface.
3599const DISPLAY_DIAGNOSTICS_MAX_BYTES: usize = 4 * 1024;
3600
3601fn write_exit(f: &mut fmt::Formatter<'_>, prefix: &str, exit: &LeanWorkerExit) -> fmt::Result {
3602    let tail = exit.diagnostics.trim();
3603    if tail.is_empty() {
3604        write!(f, "{prefix} with {}", exit.status)
3605    } else {
3606        let truncated = truncate_for_display(tail, DISPLAY_DIAGNOSTICS_MAX_BYTES);
3607        write!(f, "{prefix} with {}: {truncated}", exit.status)
3608    }
3609}
3610
3611fn import_stats_diagnostic(stats: Option<&LeanWorkerImportStats>) -> String {
3612    let Some(stats) = stats else {
3613        return String::from("last_import_stats=unavailable");
3614    };
3615    format!(
3616        "last_import_stats=available import_profile=level:{} import_all:{} load_exts:{} direct_import_count={} direct_imports={} effective_modules={} compacted_regions={} memory_mapped_regions={} compacted_region_bytes={} memory_mapped_region_bytes={} non_memory_mapped_region_bytes={} imported_constants={} extension_entries={}",
3617        stats.import_level,
3618        stats.import_all,
3619        stats.load_exts,
3620        stats.direct_import_names.len(),
3621        stats.direct_import_names.join(","),
3622        stats.effective_module_count,
3623        stats.compacted_region_count,
3624        stats.memory_mapped_region_count,
3625        stats.compacted_region_bytes,
3626        stats.memory_mapped_region_bytes,
3627        stats.non_memory_mapped_region_bytes,
3628        stats.imported_constant_count,
3629        stats.total_imported_extension_entries
3630    )
3631}
3632
3633/// Truncate `text` to at most `max_bytes` bytes for human display, appending
3634/// `… (N bytes truncated)` when bytes are dropped.
3635///
3636/// The cut respects two invariants that matter for the actual log streams we
3637/// surface (Lake/`elan` stderr): the cut lands on a UTF-8 char boundary, and
3638/// it never lands inside an unterminated ANSI CSI escape (`ESC '[' …
3639/// 0x40..=0x7e`). If the naive cut would bisect a CSI, we back up to the byte
3640/// immediately before the `ESC`. Half-open escapes corrupt downstream terminals
3641/// and log viewers; the byte cost of backing up is negligible compared to the
3642/// 4 KiB cap.
3643fn truncate_for_display(text: &str, max_bytes: usize) -> String {
3644    if text.len() <= max_bytes {
3645        return text.to_owned();
3646    }
3647
3648    let mut cut = max_bytes;
3649    while cut > 0 && !text.is_char_boundary(cut) {
3650        cut = cut.saturating_sub(1);
3651    }
3652
3653    let bytes = text.as_bytes();
3654    if let Some(esc_off) = bytes
3655        .get(..cut)
3656        .and_then(|prefix| prefix.iter().rposition(|&b| b == 0x1b))
3657    {
3658        // CSI is `ESC '[' params terminator-in-0x40..=0x7e`, with `[` excluded
3659        // from the terminator range. Earlier escape sequences must already be
3660        // terminated to have reached the parser state for this one.
3661        let scan_start = esc_off.saturating_add(2).min(cut);
3662        let terminated = bytes
3663            .get(scan_start..cut)
3664            .is_some_and(|tail| tail.iter().any(|&b| matches!(b, 0x40..=0x5a | 0x5c..=0x7e)));
3665        if !terminated {
3666            cut = esc_off;
3667        }
3668    }
3669
3670    while cut > 0 && !text.is_char_boundary(cut) {
3671        cut = cut.saturating_sub(1);
3672    }
3673
3674    let truncated_bytes = text.len().saturating_sub(cut);
3675    let kept = text.get(..cut).unwrap_or("");
3676    format!("{kept}… ({truncated_bytes} bytes truncated)")
3677}
3678
3679/// Synthesise a degraded module-query batch outcome after a child abort: every
3680/// requested selector reports `BudgetExceeded` so the caller sees an honest
3681/// "could not complete under resource pressure" per id rather than a hard error.
3682fn degraded_query_batch_outcome(
3683    selectors: &[LeanWorkerModuleQuerySelector],
3684    resource: LeanWorkerResourceExhaustedFacts,
3685) -> LeanWorkerModuleQueryBatchOutcome {
3686    let items = selectors
3687        .iter()
3688        .map(|selector| LeanWorkerModuleQueryBatchItem::BudgetExceeded {
3689            id: selector.id().to_owned(),
3690            message: "worker aborted during module query; result degraded under resource pressure".to_owned(),
3691        })
3692        .collect();
3693    LeanWorkerModuleQueryBatchOutcome::Ok {
3694        result: LeanWorkerModuleQueryBatchEnvelope {
3695            items,
3696            total_truncated: false,
3697        },
3698        imports: Vec::new(),
3699        facts: LeanWorkerModuleQueryCacheFacts {
3700            resource: Some(Box::new(resource)),
3701            ..LeanWorkerModuleQueryCacheFacts::uncached(0)
3702        },
3703    }
3704}
3705
3706fn degraded_declaration_verification_batch_result(
3707    request: &LeanWorkerDeclarationVerificationBatchRequest,
3708) -> LeanWorkerDeclarationVerificationBatchResult {
3709    LeanWorkerDeclarationVerificationBatchResult::Ok {
3710        results: request
3711            .targets
3712            .iter()
3713            .map(|target| LeanWorkerDeclarationVerificationBatchRow {
3714                id: target.id.clone(),
3715                target: target.target.clone(),
3716                verification_status: LeanWorkerDeclarationVerificationStatus::BudgetExceeded,
3717                facts: Box::new(LeanWorkerDeclarationVerificationFacts::unavailable()),
3718            })
3719            .collect(),
3720        imports: Vec::new(),
3721    }
3722}
3723
3724fn unexpected_response(operation: &'static str, response: &Response) -> LeanWorkerError {
3725    LeanWorkerError::Protocol {
3726        message: format!("worker sent unexpected {operation} response: {response:?}"),
3727    }
3728}
3729
3730fn stale_worker_output_error(
3731    operation: &'static str,
3732    expected: WorkerGeneration,
3733    actual: WorkerGeneration,
3734) -> LeanWorkerError {
3735    LeanWorkerError::Protocol {
3736        message: format!(
3737            "worker sent stale {operation} frame from generation {}, current generation is {}",
3738            actual.0, expected.0
3739        ),
3740    }
3741}
3742
3743fn stale_worker_request_output_error(
3744    operation: &'static str,
3745    expected: Option<WorkerRequestId>,
3746    actual: WorkerRequestId,
3747) -> LeanWorkerError {
3748    let expected = expected.map_or_else(|| "none".to_owned(), |request_id| request_id.0.to_string());
3749    LeanWorkerError::Protocol {
3750        message: format!(
3751            "worker sent stale {operation} frame from request {}, current request is {expected}",
3752            actual.0
3753        ),
3754    }
3755}
3756
3757fn path_string(path: &Path) -> String {
3758    path.to_string_lossy().into_owned()
3759}
3760
3761fn fixture_capability_manifest(fixture_root: &Path) -> Result<PathBuf, LeanWorkerError> {
3762    let built = lean_toolchain::CargoLeanCapability::new(fixture_root, "LeanRsFixture")
3763        .package("lean_rs_fixture")
3764        .module("LeanRsFixture")
3765        .export_signature(fixture_mul_signature("lean_rs_fixture_u64_mul"))
3766        .export_signature(fixture_panic_signature("lean_rs_fixture_panic_unit"))
3767        .build_quiet()
3768        .map_err(|diagnostic| LeanWorkerError::CapabilityBuild { diagnostic })?;
3769    Ok(built.manifest_path().to_path_buf())
3770}
3771
3772/// Parent-side collector that decodes per-row JSON-string payloads from
3773/// `list_declarations_strings` into an owned `Vec<String>`.
3774///
3775/// Each name lands as its own `Message::DataRow` so the 1 MiB protocol frame
3776/// cap binds per-name (any single Lean name is well under that) rather than
3777/// per-response. Decoding failures or sink panics are surfaced through the
3778/// usual `LeanWorkerError::Protocol` / `DataSinkPanic` paths.
3779#[derive(Debug, Default)]
3780struct DeclarationNameCollector {
3781    names: Mutex<Vec<String>>,
3782    decode_error: Mutex<Option<String>>,
3783}
3784
3785impl DeclarationNameCollector {
3786    fn into_inner(self) -> Vec<String> {
3787        self.names.into_inner().unwrap_or_default()
3788    }
3789}
3790
3791impl LeanWorkerRawDataSink for DeclarationNameCollector {
3792    fn report(&self, row: LeanWorkerRawDataRow) {
3793        match serde_json::from_str::<String>(row.payload.get()) {
3794            Ok(name) => {
3795                if let Ok(mut guard) = self.names.lock() {
3796                    guard.push(name);
3797                }
3798            }
3799            Err(err) => {
3800                if let Ok(mut slot) = self.decode_error.lock()
3801                    && slot.is_none()
3802                {
3803                    *slot = Some(format!("list_declarations_strings row payload decode failed: {err}"));
3804                }
3805            }
3806        }
3807    }
3808}
3809
3810#[cfg(target_os = "linux")]
3811fn child_rss_kib(pid: u32) -> Option<u64> {
3812    let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
3813    status.lines().find_map(|line| {
3814        let rest = line.strip_prefix("VmRSS:")?;
3815        rest.split_whitespace().next()?.parse::<u64>().ok()
3816    })
3817}
3818
3819#[cfg(not(target_os = "linux"))]
3820fn child_rss_kib(pid: u32) -> Option<u64> {
3821    let output = Command::new("ps")
3822        .args(["-o", "rss=", "-p", &pid.to_string()])
3823        .output()
3824        .ok()?;
3825    if !output.status.success() {
3826        return None;
3827    }
3828    let text = String::from_utf8_lossy(&output.stdout);
3829    text.trim().parse::<u64>().ok().filter(|value| *value > 0)
3830}
3831
3832#[cfg(test)]
3833#[allow(clippy::expect_used, clippy::panic, clippy::wildcard_enum_match_arm)]
3834mod tests {
3835    use super::{
3836        DISPLAY_DIAGNOSTICS_MAX_BYTES, LeanWorkerConfig, LeanWorkerDeclarationVerificationBatchResult,
3837        LeanWorkerDeclarationVerificationFacts, LeanWorkerDeclarationVerificationStatus, LeanWorkerError,
3838        LeanWorkerExit, LeanWorkerLifecycleSnapshot, LeanWorkerModuleQueryBatchItem, LeanWorkerModuleQueryBatchOutcome,
3839        LeanWorkerModuleQuerySelector, LeanWorkerRestartPolicy, LeanWorkerRestartReason, LeanWorkerStats,
3840        MAX_FRAME_BYTES, MAX_FRAME_BYTES_HARD_CAP, MIN_FRAME_BYTES, WorkerGeneration, WorkerRequestId,
3841        stale_worker_output_error, stale_worker_request_output_error, truncate_for_display,
3842    };
3843    use lean_rs_worker_protocol::types::{
3844        LeanWorkerDeclarationVerificationBatchItem, LeanWorkerDeclarationVerificationBatchRequest,
3845        LeanWorkerDeclarationVerificationTarget, LeanWorkerOutputBudgets, LeanWorkerSorryPolicy,
3846    };
3847    use std::path::PathBuf;
3848    use std::time::Duration;
3849
3850    fn dummy_config() -> LeanWorkerConfig {
3851        LeanWorkerConfig::new(PathBuf::from("/nonexistent/lean-rs-worker-child"))
3852    }
3853
3854    fn exit_with(diagnostics: &str, success: bool) -> LeanWorkerExit {
3855        let (code, status) = if success {
3856            (0_i32, "exit status: 0".to_owned())
3857        } else {
3858            (1_i32, "exit status: 1".to_owned())
3859        };
3860        LeanWorkerExit {
3861            success,
3862            code: Some(code),
3863            status,
3864            diagnostics: diagnostics.to_owned(),
3865        }
3866    }
3867
3868    #[test]
3869    fn max_frame_bytes_default_matches_legacy_cap() {
3870        let config = dummy_config();
3871        assert_eq!(config.max_frame_bytes, MAX_FRAME_BYTES);
3872    }
3873
3874    #[test]
3875    fn max_frame_bytes_clamps_below_floor() {
3876        let config = dummy_config().max_frame_bytes(1024);
3877        assert_eq!(config.max_frame_bytes, MIN_FRAME_BYTES);
3878    }
3879
3880    #[test]
3881    fn max_frame_bytes_clamps_above_ceiling() {
3882        let config = dummy_config().max_frame_bytes(u32::MAX);
3883        assert_eq!(config.max_frame_bytes, MAX_FRAME_BYTES_HARD_CAP);
3884    }
3885
3886    #[test]
3887    fn max_frame_bytes_passes_through_in_range() {
3888        let config = dummy_config().max_frame_bytes(8 * 1024 * 1024);
3889        assert_eq!(config.max_frame_bytes, 8 * 1024 * 1024);
3890    }
3891
3892    #[test]
3893    fn rss_hard_limit_config_clamps_to_nonzero_policy() {
3894        let config = dummy_config().rss_hard_limit(0, Duration::ZERO);
3895        assert_eq!(config.rss_hard_limit_kib, Some(1));
3896        assert_eq!(config.rss_sample_interval, Duration::from_millis(1));
3897    }
3898
3899    #[test]
3900    fn restart_intensity_policy_clamps_to_nonzero_window() {
3901        let policy = LeanWorkerRestartPolicy::default().max_restarts_per_window(0, Duration::ZERO);
3902        assert_eq!(policy.restart_intensity.max_restarts, 1);
3903        assert_eq!(policy.restart_intensity.window, Duration::from_millis(1));
3904    }
3905
3906    #[test]
3907    fn conformance_stale_generation_output_is_protocol_failure() {
3908        let err = stale_worker_output_error("health", WorkerGeneration(2), WorkerGeneration(1));
3909        match err {
3910            LeanWorkerError::Protocol { message } => {
3911                assert!(message.contains("stale health frame"));
3912                assert!(message.contains("generation 1"));
3913                assert!(message.contains("current generation is 2"));
3914            }
3915            other => panic!("expected protocol error, got {other:?}"),
3916        }
3917    }
3918
3919    #[test]
3920    fn conformance_stale_request_output_is_protocol_failure() {
3921        let err = stale_worker_request_output_error("emit_test_rows", Some(WorkerRequestId(2)), WorkerRequestId(1));
3922        match err {
3923            LeanWorkerError::Protocol { message } => {
3924                assert!(message.contains("stale emit_test_rows frame"));
3925                assert!(message.contains("request 1"));
3926                assert!(message.contains("current request is 2"));
3927            }
3928            other => panic!("expected protocol error, got {other:?}"),
3929        }
3930    }
3931
3932    #[test]
3933    fn lifecycle_snapshot_exposes_restart_generation() {
3934        let stats = LeanWorkerStats {
3935            restarts: 3,
3936            exits: 2,
3937            last_restart_reason: Some(LeanWorkerRestartReason::RequestTimeout {
3938                operation: "test",
3939                duration: std::time::Duration::from_secs(1),
3940            }),
3941            last_rss_kib: Some(42),
3942            rss_samples_unavailable: 1,
3943            ..LeanWorkerStats::default()
3944        };
3945        let exit = exit_with("bye", false);
3946        let snapshot = LeanWorkerLifecycleSnapshot::from_worker(&stats, Some(exit.clone()));
3947        assert_eq!(snapshot.worker_generation, 3);
3948        assert_eq!(snapshot.restarts, 3);
3949        assert_eq!(snapshot.exits, 2);
3950        assert_eq!(snapshot.last_exit, Some(exit));
3951        assert_eq!(snapshot.last_rss_kib, Some(42));
3952        assert_eq!(snapshot.rss_samples_unavailable, 1);
3953    }
3954
3955    #[test]
3956    fn restart_reason_exposes_stable_policy_cause() {
3957        assert_eq!(LeanWorkerRestartReason::Explicit.stable_cause(), "explicit");
3958        assert_eq!(
3959            LeanWorkerRestartReason::MaxRequests { limit: 1 }.stable_cause(),
3960            "max_requests"
3961        );
3962        assert_eq!(
3963            LeanWorkerRestartReason::MaxImports { limit: 1 }.stable_cause(),
3964            "max_imports"
3965        );
3966        assert_eq!(
3967            LeanWorkerRestartReason::RssCeiling {
3968                current_kib: 2,
3969                limit_kib: 1,
3970                last_import_stats: None,
3971            }
3972            .stable_cause(),
3973            "rss_ceiling"
3974        );
3975        assert_eq!(
3976            LeanWorkerRestartReason::RssHardLimit {
3977                operation: "test",
3978                current_kib: 2,
3979                limit_kib: 1,
3980                last_import_stats: None,
3981            }
3982            .stable_cause(),
3983            "rss_hard_limit"
3984        );
3985        assert_eq!(
3986            LeanWorkerRestartReason::RequestTimeout {
3987                operation: "test",
3988                duration: Duration::from_millis(1),
3989            }
3990            .stable_cause(),
3991            "timeout"
3992        );
3993        // The verify/proof-state abort guard surfaces the same `child_abort`
3994        // cause the parent's relabel heuristic already keys on.
3995        assert_eq!(
3996            LeanWorkerRestartReason::ChildAbort { operation: "test" }.stable_cause(),
3997            "child_abort"
3998        );
3999    }
4000
4001    #[test]
4002    fn degraded_query_batch_outcome_marks_every_selector_budget_exceeded() {
4003        let selectors = vec![
4004            LeanWorkerModuleQuerySelector::ProofState {
4005                id: "a".to_owned(),
4006                line: 1,
4007                column: 1,
4008            },
4009            LeanWorkerModuleQuerySelector::Diagnostics { id: "b".to_owned() },
4010        ];
4011        let resource = super::resource_facts(
4012            "worker_child_abort",
4013            true,
4014            Some("worker_process_module_query_batch"),
4015            None,
4016            None,
4017            Some(1),
4018            Some(2),
4019            Some("child_abort".to_owned()),
4020            None,
4021            None,
4022            None,
4023            None,
4024            None,
4025            None,
4026            None,
4027            None,
4028        );
4029        let LeanWorkerModuleQueryBatchOutcome::Ok { result, imports, facts } =
4030            super::degraded_query_batch_outcome(&selectors, resource.clone())
4031        else {
4032            panic!("degraded outcome should be Ok with per-selector items");
4033        };
4034        assert!(imports.is_empty());
4035        assert_eq!(facts.resource.as_deref(), Some(&resource));
4036        assert_eq!(result.items.len(), 2);
4037        // `filter_map` keeps only `BudgetExceeded` items, so a non-degraded item
4038        // would drop out and the id vector would no longer match.
4039        let ids: Vec<&str> = result
4040            .items
4041            .iter()
4042            .filter_map(|item| {
4043                if let LeanWorkerModuleQueryBatchItem::BudgetExceeded { id, .. } = item {
4044                    Some(id.as_str())
4045                } else {
4046                    None
4047                }
4048            })
4049            .collect();
4050        assert_eq!(ids, vec!["a", "b"]);
4051    }
4052
4053    #[test]
4054    fn declaration_outline_degraded_query_batch_outcome_uses_selector_id() {
4055        let selectors = vec![LeanWorkerModuleQuerySelector::DeclarationOutline {
4056            id: "outline".to_owned(),
4057        }];
4058        let resource = super::resource_facts(
4059            "worker_child_abort",
4060            true,
4061            Some("worker_process_module_query_batch"),
4062            None,
4063            None,
4064            Some(1),
4065            Some(2),
4066            Some("child_abort".to_owned()),
4067            None,
4068            None,
4069            None,
4070            None,
4071            None,
4072            None,
4073            None,
4074            None,
4075        );
4076        let LeanWorkerModuleQueryBatchOutcome::Ok { result, facts, .. } =
4077            super::degraded_query_batch_outcome(&selectors, resource.clone())
4078        else {
4079            panic!("degraded outcome should be Ok with per-selector items");
4080        };
4081        assert_eq!(facts.resource.as_deref(), Some(&resource));
4082        assert!(matches!(
4083            result.items.as_slice(),
4084            [LeanWorkerModuleQueryBatchItem::BudgetExceeded { id, .. }] if id == "outline"
4085        ));
4086    }
4087
4088    #[test]
4089    fn command_message_degraded_query_batch_outcome_uses_diagnostics_selector_id() {
4090        let selectors = vec![LeanWorkerModuleQuerySelector::Diagnostics {
4091            id: "messages".to_owned(),
4092        }];
4093        let resource = super::resource_facts(
4094            "worker_child_abort",
4095            true,
4096            Some("worker_process_module_query_batch"),
4097            None,
4098            None,
4099            Some(1),
4100            Some(2),
4101            Some("child_abort".to_owned()),
4102            None,
4103            None,
4104            None,
4105            None,
4106            None,
4107            None,
4108            None,
4109            None,
4110        );
4111        let LeanWorkerModuleQueryBatchOutcome::Ok { result, facts, .. } =
4112            super::degraded_query_batch_outcome(&selectors, resource.clone())
4113        else {
4114            panic!("degraded outcome should be Ok with per-selector items");
4115        };
4116        assert_eq!(facts.resource.as_deref(), Some(&resource));
4117        assert!(matches!(
4118            result.items.as_slice(),
4119            [LeanWorkerModuleQueryBatchItem::BudgetExceeded { id, .. }] if id == "messages"
4120        ));
4121    }
4122
4123    #[test]
4124    fn declaration_verification_batch_degraded_result_preserves_target_order() {
4125        let request = LeanWorkerDeclarationVerificationBatchRequest {
4126            source: "theorem a : True := by trivial\n".to_owned(),
4127            targets: vec![
4128                LeanWorkerDeclarationVerificationBatchItem {
4129                    id: "first".to_owned(),
4130                    target: LeanWorkerDeclarationVerificationTarget::Name { name: "a".to_owned() },
4131                },
4132                LeanWorkerDeclarationVerificationBatchItem {
4133                    id: "second".to_owned(),
4134                    target: LeanWorkerDeclarationVerificationTarget::Name { name: "b".to_owned() },
4135                },
4136            ],
4137            sorry_policy: LeanWorkerSorryPolicy::Deny,
4138            report_axioms: true,
4139            budgets: LeanWorkerOutputBudgets::default(),
4140        };
4141        let LeanWorkerDeclarationVerificationBatchResult::Ok { results, imports } =
4142            super::degraded_declaration_verification_batch_result(&request)
4143        else {
4144            panic!("degraded batch verification should be an Ok batch");
4145        };
4146        assert!(imports.is_empty());
4147        let ids: Vec<&str> = results.iter().map(|row| row.id.as_str()).collect();
4148        assert_eq!(ids, vec!["first", "second"]);
4149        assert!(results.iter().all(|row| {
4150            row.verification_status == LeanWorkerDeclarationVerificationStatus::BudgetExceeded
4151                && !row.facts.axioms_available
4152                && row.facts.axioms.is_empty()
4153        }));
4154    }
4155
4156    #[test]
4157    fn unavailable_verification_facts_report_axioms_uncomputed() {
4158        let facts = LeanWorkerDeclarationVerificationFacts::unavailable();
4159        assert!(
4160            !facts.axioms_available,
4161            "degraded facts must not claim a computed axiom set"
4162        );
4163        assert!(facts.axioms.is_empty());
4164        assert!(facts.target.is_none());
4165    }
4166
4167    #[test]
4168    fn display_child_panic_or_abort_includes_stderr_tail() {
4169        let exit = exit_with("could not dlopen X.dylib: image not found", false);
4170        let err = LeanWorkerError::ChildPanicOrAbort { exit };
4171        let rendered = err.to_string();
4172        assert!(rendered.contains("exit status"), "{rendered}");
4173        assert!(
4174            rendered.contains("could not dlopen X.dylib: image not found"),
4175            "{rendered}"
4176        );
4177        assert!(rendered.starts_with("worker exited fatally with "), "{rendered}");
4178    }
4179
4180    #[test]
4181    fn display_child_exited_includes_stderr_tail() {
4182        let exit = exit_with("warning: lean-rs-worker exiting cleanly\n", true);
4183        let err = LeanWorkerError::ChildExited { exit };
4184        let rendered = err.to_string();
4185        assert!(rendered.starts_with("worker exited with "), "{rendered}");
4186        assert!(
4187            rendered.contains("warning: lean-rs-worker exiting cleanly"),
4188            "{rendered}"
4189        );
4190    }
4191
4192    #[test]
4193    fn display_keeps_terse_format_when_diagnostics_empty() {
4194        let exit = exit_with("", false);
4195        let err = LeanWorkerError::ChildPanicOrAbort { exit };
4196        assert_eq!(err.to_string(), "worker exited fatally with exit status: 1");
4197
4198        let exit = exit_with("   \n\t  ", true);
4199        let err = LeanWorkerError::ChildExited { exit };
4200        assert_eq!(err.to_string(), "worker exited with exit status: 0");
4201    }
4202
4203    #[test]
4204    fn display_truncates_oversized_diagnostics_with_annotation() {
4205        let large: String = "x".repeat(DISPLAY_DIAGNOSTICS_MAX_BYTES * 2);
4206        let exit = exit_with(&large, false);
4207        let err = LeanWorkerError::ChildPanicOrAbort { exit };
4208        let rendered = err.to_string();
4209        assert!(rendered.contains("bytes truncated"), "{rendered}");
4210        assert!(
4211            rendered.len() < large.len() + 128,
4212            "rendered length {} unexpectedly large for original {}",
4213            rendered.len(),
4214            large.len()
4215        );
4216    }
4217
4218    #[test]
4219    fn truncate_for_display_returns_input_when_under_cap() {
4220        let s = "short message";
4221        assert_eq!(truncate_for_display(s, 1024), s);
4222    }
4223
4224    #[test]
4225    fn truncate_for_display_cuts_at_char_boundary() {
4226        // 4 copies of `é` (2 bytes each) → 8 bytes total. Capping at 5 must
4227        // not slice the multi-byte sequence in half.
4228        let s = "ééééé";
4229        let out = truncate_for_display(s, 5);
4230        let before_marker = out.split('…').next().unwrap_or("");
4231        assert!(before_marker.is_char_boundary(before_marker.len()));
4232        assert!(out.contains("bytes truncated"), "{out}");
4233    }
4234
4235    #[test]
4236    fn truncate_for_display_does_not_split_ansi_csi() {
4237        // Construct: leading text, then ESC '[' '3' '1' 'm' (red), then more text.
4238        // Place the CSI so a naive cap lands inside it.
4239        let mut s = String::from("hello ");
4240        s.push('\x1b');
4241        s.push('[');
4242        s.push('3');
4243        s.push('1');
4244        s.push('m');
4245        s.push_str("RED");
4246        // Cap chosen to fall between ESC and the terminator `m`.
4247        let cap = s.find('1').expect("test fixture invariant: '1' present");
4248        let out = truncate_for_display(&s, cap);
4249        // The kept prefix must not contain a bare ESC.
4250        let before_marker = out.split('…').next().unwrap_or("");
4251        assert!(
4252            !before_marker.contains('\x1b'),
4253            "truncated prefix still contains ESC: {before_marker:?}"
4254        );
4255        assert!(out.contains("bytes truncated"), "{out}");
4256    }
4257
4258    #[test]
4259    fn truncate_for_display_keeps_terminated_ansi_csi() {
4260        // A terminated CSI before the cap should survive truncation intact.
4261        let mut s = String::from("\x1b[31mRED\x1b[0m ");
4262        s.push_str(&"x".repeat(64));
4263        let out = truncate_for_display(&s, 20);
4264        assert!(out.contains("\x1b[31m"), "{out}");
4265        assert!(out.contains("bytes truncated"), "{out}");
4266    }
4267}