Skip to main content

lean_rs_worker/
supervisor.rs

1use std::ffi::OsString;
2use std::fmt;
3use std::io::{BufReader, BufWriter, Read as _};
4use std::path::{Path, PathBuf};
5use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
6use std::sync::mpsc;
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::capability::LeanWorkerBootstrapDiagnosticCode;
11use crate::protocol::{Message, Request, Response, read_frame, write_frame};
12use crate::session::LeanWorkerDataSinkTarget;
13use crate::session::{
14    LeanWorkerCancellationToken, LeanWorkerCapabilityMetadata, LeanWorkerDataSink, LeanWorkerDiagnosticSink,
15    LeanWorkerDoctorReport, LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerKernelResult,
16    LeanWorkerProgressSink, LeanWorkerRawDataSink, LeanWorkerRuntimeMetadata, LeanWorkerSessionConfig,
17    LeanWorkerStreamSummary, check_cancelled, elapsed_event, report_parent_data_row, report_parent_diagnostic,
18    report_parent_progress,
19};
20
21const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
22const WORKER_EVENT_BUFFER_CAPACITY: usize = 64;
23
24/// Default deadline for one worker request after startup.
25pub const LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(30);
26
27/// Suggested deadline for long-running worker requests.
28pub const LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING: Duration = Duration::from_mins(10);
29
30/// Configuration for starting a `lean-rs-worker` child process.
31///
32/// The executable should be the `lean-rs-worker-child` binary.
33///
34/// **Worker-child panic policy.** The supervisor spawns every child with two
35/// defaults that together pin a process boundary around Lean panics:
36///
37/// - `LEAN_ABORT_ON_PANIC=1` — Lean internal panics terminate the child
38///   instead of returning default values, so the parent observes a fatal
39///   exit rather than silently-corrupted state.
40/// - `LEAN_BACKTRACE=0` — Lean's panic-time backtrace handler is skipped.
41///   Since Lean 4.30 that handler calls back into Lean code (the demangler
42///   is now `@[export]`'d from `Lean.Compiler.NameDemangling`); a worker
43///   child embeds a minimal Lean and cannot guarantee that callback's
44///   transitive module dependencies are initialized when user code panics.
45///   Disabling the backtrace removes that dependency entirely. See
46///   `docs/architecture/06-panic-containment.md` for the boundary argument.
47///
48/// Both are safe defaults — explicit `.env()` entries supplied here override
49/// them, in case a caller knows the dependency is satisfied and wants a
50/// demangled backtrace on the child's stderr.
51#[derive(Clone, Debug)]
52pub struct LeanWorkerConfig {
53    executable: PathBuf,
54    current_dir: Option<PathBuf>,
55    env: Vec<(OsString, OsString)>,
56    startup_timeout: Duration,
57    request_timeout: Duration,
58    restart_policy: LeanWorkerRestartPolicy,
59}
60
61impl LeanWorkerConfig {
62    /// Create a worker configuration for a child executable.
63    pub fn new(executable: impl Into<PathBuf>) -> Self {
64        Self {
65            executable: executable.into(),
66            current_dir: None,
67            env: Vec::new(),
68            startup_timeout: DEFAULT_STARTUP_TIMEOUT,
69            request_timeout: LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT,
70            restart_policy: LeanWorkerRestartPolicy::default(),
71        }
72    }
73
74    /// Return the child executable path.
75    pub fn executable(&self) -> &Path {
76        &self.executable
77    }
78
79    /// Set the child working directory.
80    #[must_use]
81    pub fn current_dir(mut self, path: impl Into<PathBuf>) -> Self {
82        self.current_dir = Some(path.into());
83        self
84    }
85
86    /// Add or override one child environment variable.
87    #[must_use]
88    pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
89        self.env.push((key.into(), value.into()));
90        self
91    }
92
93    /// Set the maximum time to wait for the child handshake.
94    #[must_use]
95    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
96        self.startup_timeout = timeout;
97        self
98    }
99
100    /// Set the maximum time to wait for one request's terminal response.
101    ///
102    /// The request timeout starts after the request frame is written. It covers
103    /// live rows, diagnostics, progress events, and the terminal response. On
104    /// timeout, the supervisor kills and replaces the child process.
105    #[must_use]
106    pub fn request_timeout(mut self, timeout: Duration) -> Self {
107        self.request_timeout = timeout;
108        self
109    }
110
111    /// Use the documented long-running request timeout profile.
112    #[must_use]
113    pub fn long_running_requests(mut self) -> Self {
114        self.request_timeout = LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING;
115        self
116    }
117
118    /// Set the worker restart policy.
119    ///
120    /// Policy checks run before requests enter the child. A policy restart is a
121    /// process restart; it is the only supported reset for Lean process-global
122    /// runtime and import state.
123    #[must_use]
124    pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
125        self.restart_policy = policy;
126        self
127    }
128}
129
130/// Policy for cycling a worker child before the next request.
131///
132/// The policy resets retained Lean runtime memory only by restarting the
133/// process. It does not change `lean-rs-host`'s in-process memory model, and it
134/// does not imply that `SessionPool::drain()` can return process-global Lean
135/// memory to the OS.
136#[derive(Clone, Debug, Default, Eq, PartialEq)]
137pub struct LeanWorkerRestartPolicy {
138    max_requests: Option<u64>,
139    max_imports: Option<u64>,
140    max_rss_kib: Option<u64>,
141    idle_restart_after: Option<Duration>,
142}
143
144impl LeanWorkerRestartPolicy {
145    /// Disable automatic policy restarts.
146    #[must_use]
147    pub fn disabled() -> Self {
148        Self::default()
149    }
150
151    /// Restart before a request when this many requests have entered the child.
152    #[must_use]
153    pub fn max_requests(mut self, limit: u64) -> Self {
154        self.max_requests = Some(limit.max(1));
155        self
156    }
157
158    /// Restart before an import-like request when this many imports have run.
159    #[must_use]
160    pub fn max_imports(mut self, limit: u64) -> Self {
161        self.max_imports = Some(limit.max(1));
162        self
163    }
164
165    /// Restart before a request when measured child RSS is at least this many KiB.
166    ///
167    /// RSS measurement is best effort. It is implemented for the current
168    /// supported Unix development targets; unsupported platforms skip the
169    /// check and increment `LeanWorkerStats::rss_samples_unavailable`.
170    #[must_use]
171    pub fn max_rss_kib(mut self, limit: u64) -> Self {
172        self.max_rss_kib = Some(limit.max(1));
173        self
174    }
175
176    /// Restart before a request when the worker has been idle for this long.
177    #[must_use]
178    pub fn idle_restart_after(mut self, duration: Duration) -> Self {
179        self.idle_restart_after = Some(duration);
180        self
181    }
182}
183
184/// Reason recorded for the latest worker cycle.
185#[derive(Clone, Debug, Eq, PartialEq)]
186pub enum LeanWorkerRestartReason {
187    /// The caller explicitly requested a process cycle.
188    Explicit,
189    /// Request count reached the configured limit before the next request.
190    MaxRequests { limit: u64 },
191    /// Import-like request count reached the configured limit before the next import.
192    MaxImports { limit: u64 },
193    /// Child resident set size reached the configured limit.
194    RssCeiling { current_kib: u64, limit_kib: u64 },
195    /// Worker was idle at least as long as the configured limit.
196    Idle { idle_for: Duration, limit: Duration },
197    /// Parent-side cancellation replaced the child during an in-flight request.
198    Cancelled { operation: &'static str },
199    /// Parent-side request timeout replaced the child during an in-flight request.
200    RequestTimeout {
201        operation: &'static str,
202        duration: Duration,
203    },
204}
205
206/// Snapshot of worker lifecycle counters.
207#[derive(Clone, Debug, Default, Eq, PartialEq)]
208pub struct LeanWorkerStats {
209    /// Requests that entered a worker child.
210    pub requests: u64,
211    /// Import-like requests that entered a worker child.
212    pub imports: u64,
213    /// Child exits observed by the supervisor, including policy cycles.
214    pub exits: u64,
215    /// Policy or explicit restarts performed by the supervisor.
216    pub restarts: u64,
217    /// Explicit process cycles.
218    pub explicit_cycles: u64,
219    /// Restarts caused by `LeanWorkerRestartPolicy::max_requests`.
220    pub max_request_restarts: u64,
221    /// Restarts caused by `LeanWorkerRestartPolicy::max_imports`.
222    pub max_import_restarts: u64,
223    /// Restarts caused by `LeanWorkerRestartPolicy::max_rss_kib`.
224    pub rss_restarts: u64,
225    /// Restarts caused by `LeanWorkerRestartPolicy::idle_restart_after`.
226    pub idle_restarts: u64,
227    /// Restarts caused by parent-side cancellation of an in-flight request.
228    pub cancelled_restarts: u64,
229    /// Restarts caused by parent-side request timeouts.
230    pub timeout_restarts: u64,
231    /// RSS checks skipped because the platform did not provide a usable sample.
232    pub rss_samples_unavailable: u64,
233    /// Last measured child RSS in KiB, when a policy check could sample it.
234    pub last_rss_kib: Option<u64>,
235    /// Most recent restart reason, if any.
236    pub last_restart_reason: Option<LeanWorkerRestartReason>,
237    /// Streaming requests that entered a worker child.
238    pub stream_requests: u64,
239    /// Streaming requests that reached terminal success.
240    pub stream_successes: u64,
241    /// Streaming requests that failed after entering the child.
242    pub stream_failures: u64,
243    /// Data rows delivered to parent-side sinks.
244    pub data_rows_delivered: u64,
245    /// Raw row payload bytes delivered to parent-side sinks.
246    pub data_row_payload_bytes: u64,
247    /// Total elapsed time spent in streaming requests.
248    pub stream_elapsed: Duration,
249    /// Times the bounded worker-event reader had to wait for the parent to drain events.
250    pub backpressure_waits: u64,
251    /// Streaming requests that failed after bounded-buffer backpressure was observed.
252    pub backpressure_failures: u64,
253}
254
255impl LeanWorkerStats {
256    fn record_restart(&mut self, reason: LeanWorkerRestartReason) {
257        self.restarts = self.restarts.saturating_add(1);
258        match &reason {
259            LeanWorkerRestartReason::Explicit => {
260                self.explicit_cycles = self.explicit_cycles.saturating_add(1);
261            }
262            LeanWorkerRestartReason::MaxRequests { .. } => {
263                self.max_request_restarts = self.max_request_restarts.saturating_add(1);
264            }
265            LeanWorkerRestartReason::MaxImports { .. } => {
266                self.max_import_restarts = self.max_import_restarts.saturating_add(1);
267            }
268            LeanWorkerRestartReason::RssCeiling { .. } => {
269                self.rss_restarts = self.rss_restarts.saturating_add(1);
270            }
271            LeanWorkerRestartReason::Idle { .. } => {
272                self.idle_restarts = self.idle_restarts.saturating_add(1);
273            }
274            LeanWorkerRestartReason::Cancelled { .. } => {
275                self.cancelled_restarts = self.cancelled_restarts.saturating_add(1);
276            }
277            LeanWorkerRestartReason::RequestTimeout { .. } => {
278                self.timeout_restarts = self.timeout_restarts.saturating_add(1);
279            }
280        }
281        self.last_restart_reason = Some(reason);
282    }
283}
284
285/// Public lifecycle state for a worker child.
286#[derive(Clone, Debug, Eq, PartialEq)]
287pub enum LeanWorkerStatus {
288    /// The worker process is still running.
289    Running,
290    /// The worker process has exited.
291    Exited(LeanWorkerExit),
292}
293
294/// Rendered child-process exit information.
295#[derive(Clone, Debug, Eq, PartialEq)]
296pub struct LeanWorkerExit {
297    /// Whether the child process exited successfully.
298    pub success: bool,
299    /// The platform exit code when one is available.
300    pub code: Option<i32>,
301    /// The platform-rendered process status.
302    pub status: String,
303    /// Captured child diagnostics, if available.
304    pub diagnostics: String,
305}
306
307impl LeanWorkerExit {
308    fn from_status(status: ExitStatus, diagnostics: String) -> Self {
309        Self {
310            success: status.success(),
311            code: status.code(),
312            status: status.to_string(),
313            diagnostics,
314        }
315    }
316}
317
318/// Errors reported by the worker supervisor.
319#[derive(Debug)]
320pub enum LeanWorkerError {
321    /// The worker child could not be spawned.
322    Spawn {
323        executable: PathBuf,
324        source: std::io::Error,
325    },
326    /// The default worker child executable could not be resolved.
327    WorkerChildUnresolved {
328        /// Candidate paths checked by the default resolver.
329        tried: Vec<PathBuf>,
330    },
331    /// The resolved worker child is missing or is not executable.
332    WorkerChildNotExecutable { path: PathBuf, reason: String },
333    /// Worker bootstrap preflight failed before a real command ran.
334    Bootstrap {
335        code: LeanWorkerBootstrapDiagnosticCode,
336        message: String,
337    },
338    /// The capability Lake target could not be built.
339    CapabilityBuild {
340        /// Typed Lake/toolchain diagnostic from `lean-toolchain`.
341        diagnostic: lean_toolchain::LinkDiagnostics,
342    },
343    /// The child process could not be prepared after spawning.
344    Setup { message: String },
345    /// The child did not complete the startup handshake.
346    Handshake { message: String },
347    /// The worker protocol failed after the handshake.
348    Protocol { message: String },
349    /// The child returned a typed worker error.
350    Worker { code: String, message: String },
351    /// The child exited while a request was in flight.
352    ChildExited { exit: LeanWorkerExit },
353    /// The child exited fatally while a request was in flight.
354    ChildPanicOrAbort { exit: LeanWorkerExit },
355    /// A worker operation timed out.
356    Timeout {
357        operation: &'static str,
358        duration: Duration,
359    },
360    /// A parent-side cancellation token was observed.
361    Cancelled { operation: &'static str },
362    /// A parent-side progress sink panicked while handling a worker event.
363    ProgressPanic { message: String },
364    /// A parent-side data sink panicked while handling a worker row.
365    DataSinkPanic { message: String },
366    /// A parent-side diagnostic sink panicked while handling a worker diagnostic.
367    DiagnosticSinkPanic { message: String },
368    /// A streaming export returned a nonzero downstream status byte.
369    StreamExportFailed { status: u8 },
370    /// The in-child string callback helper returned a callback failure status.
371    StreamCallbackFailed { status: u8, description: String },
372    /// A streaming callback emitted a malformed row envelope.
373    StreamRowMalformed { message: String },
374    /// A capability metadata export returned malformed JSON.
375    CapabilityMetadataMalformed { message: String },
376    /// Capability metadata did not match the caller's requested expectation.
377    CapabilityMetadataMismatch {
378        export: String,
379        expected: Box<LeanWorkerCapabilityMetadata>,
380        actual: Box<LeanWorkerCapabilityMetadata>,
381    },
382    /// A capability doctor export returned malformed JSON.
383    CapabilityDoctorMalformed { message: String },
384    /// A typed command request could not be serialized as JSON.
385    TypedCommandRequestEncode { export: String, message: String },
386    /// A typed non-streaming command response could not be decoded.
387    TypedCommandResponseDecode { export: String, message: String },
388    /// A typed streaming command row payload could not be decoded.
389    TypedCommandRowDecode {
390        export: String,
391        stream: String,
392        sequence: u64,
393        message: String,
394    },
395    /// A typed streaming command terminal summary could not be decoded.
396    TypedCommandSummaryDecode { export: String, message: String },
397    /// A pool session lease was invalidated by a worker lifecycle transition.
398    LeaseInvalidated { reason: String },
399    /// A local worker pool cannot admit another distinct session key.
400    WorkerPoolExhausted { max_workers: usize },
401    /// A local worker pool cannot admit work without exceeding its RSS budget.
402    WorkerPoolMemoryBudgetExceeded { current_kib: u64, limit_kib: u64 },
403    /// Waiting for local worker-pool admission exceeded the configured limit.
404    WorkerPoolQueueTimeout { waited: Duration },
405    /// The public supervisor does not support the requested operation.
406    UnsupportedRequest { operation: &'static str },
407    /// Waiting for a child process failed.
408    Wait { source: std::io::Error },
409}
410
411impl fmt::Display for LeanWorkerError {
412    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413        match self {
414            Self::Spawn { executable, source } => {
415                write!(f, "failed to spawn worker {}: {source}", executable.display())
416            }
417            Self::WorkerChildUnresolved { tried } => {
418                let tried = tried
419                    .iter()
420                    .map(|path| path.display().to_string())
421                    .collect::<Vec<_>>()
422                    .join(", ");
423                write!(
424                    f,
425                    "could not resolve lean-rs-worker-child; set LEAN_RS_WORKER_CHILD or place it beside the current executable (tried: {tried})"
426                )
427            }
428            Self::WorkerChildNotExecutable { path, reason } => {
429                write!(f, "worker child '{}' is not executable: {reason}", path.display())
430            }
431            Self::Bootstrap { code, message } => {
432                write!(f, "worker bootstrap check {code} failed: {message}")
433            }
434            Self::CapabilityBuild { diagnostic } => {
435                write!(f, "worker capability Lake target build failed: {diagnostic}")
436            }
437            Self::Setup { message } => write!(f, "worker child setup failed: {message}"),
438            Self::Handshake { message } => write!(f, "worker handshake failed: {message}"),
439            Self::Protocol { message } => write!(f, "worker protocol failed: {message}"),
440            Self::Worker { code, message } => write!(f, "worker returned {code}: {message}"),
441            Self::ChildExited { exit } => write!(f, "worker exited with {}", exit.status),
442            Self::ChildPanicOrAbort { exit } => {
443                write!(f, "worker exited fatally with {}", exit.status)
444            }
445            Self::Timeout { operation, duration } => {
446                write!(f, "worker operation {operation} timed out after {duration:?}")
447            }
448            Self::Cancelled { operation } => write!(f, "worker operation {operation} was cancelled"),
449            Self::ProgressPanic { message } => write!(f, "worker progress sink panicked: {message}"),
450            Self::DataSinkPanic { message } => write!(f, "worker data sink panicked: {message}"),
451            Self::DiagnosticSinkPanic { message } => {
452                write!(f, "worker diagnostic sink panicked: {message}")
453            }
454            Self::StreamExportFailed { status } => write!(f, "streaming export returned status {status}"),
455            Self::StreamCallbackFailed { status, description } => {
456                write!(f, "streaming callback failed with status {status}: {description}")
457            }
458            Self::StreamRowMalformed { message } => write!(f, "streaming export emitted malformed row: {message}"),
459            Self::CapabilityMetadataMalformed { message } => {
460                write!(f, "capability metadata export returned malformed JSON: {message}")
461            }
462            Self::CapabilityMetadataMismatch { export, .. } => {
463                write!(f, "capability metadata from {export} did not match expectation")
464            }
465            Self::CapabilityDoctorMalformed { message } => {
466                write!(f, "capability doctor export returned malformed JSON: {message}")
467            }
468            Self::TypedCommandRequestEncode { export, message } => {
469                write!(f, "typed worker command {export} request JSON encode failed: {message}")
470            }
471            Self::TypedCommandResponseDecode { export, message } => {
472                write!(
473                    f,
474                    "typed worker command {export} response JSON decode failed: {message}"
475                )
476            }
477            Self::TypedCommandRowDecode {
478                export,
479                stream,
480                sequence,
481                message,
482            } => {
483                write!(
484                    f,
485                    "typed worker command {export} row decode failed at stream {stream} sequence {sequence}: {message}"
486                )
487            }
488            Self::TypedCommandSummaryDecode { export, message } => {
489                write!(
490                    f,
491                    "typed worker command {export} terminal summary decode failed: {message}"
492                )
493            }
494            Self::LeaseInvalidated { reason } => write!(f, "worker pool lease was invalidated: {reason}"),
495            Self::WorkerPoolExhausted { max_workers } => {
496                write!(
497                    f,
498                    "worker pool cannot admit another session key; max_workers={max_workers}"
499                )
500            }
501            Self::WorkerPoolMemoryBudgetExceeded { current_kib, limit_kib } => {
502                write!(
503                    f,
504                    "worker pool cannot admit work within RSS budget; current_kib={current_kib} limit_kib={limit_kib}"
505                )
506            }
507            Self::WorkerPoolQueueTimeout { waited } => {
508                write!(f, "worker pool admission timed out after {waited:?}")
509            }
510            Self::UnsupportedRequest { operation } => {
511                write!(f, "worker operation {operation} is not supported")
512            }
513            Self::Wait { source } => write!(f, "failed to wait for worker child: {source}"),
514        }
515    }
516}
517
518impl std::error::Error for LeanWorkerError {
519    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
520        match self {
521            Self::Spawn { source, .. } | Self::Wait { source } => Some(source),
522            Self::CapabilityBuild { diagnostic } => Some(diagnostic),
523            Self::WorkerChildUnresolved { .. } | Self::WorkerChildNotExecutable { .. } | Self::Bootstrap { .. } => None,
524            Self::Setup { .. }
525            | Self::Handshake { .. }
526            | Self::Protocol { .. }
527            | Self::Worker { .. }
528            | Self::ChildExited { .. }
529            | Self::ChildPanicOrAbort { .. }
530            | Self::Timeout { .. }
531            | Self::Cancelled { .. }
532            | Self::ProgressPanic { .. }
533            | Self::DataSinkPanic { .. }
534            | Self::DiagnosticSinkPanic { .. }
535            | Self::StreamExportFailed { .. }
536            | Self::StreamCallbackFailed { .. }
537            | Self::StreamRowMalformed { .. }
538            | Self::CapabilityMetadataMalformed { .. }
539            | Self::CapabilityMetadataMismatch { .. }
540            | Self::CapabilityDoctorMalformed { .. }
541            | Self::TypedCommandRequestEncode { .. }
542            | Self::TypedCommandResponseDecode { .. }
543            | Self::TypedCommandRowDecode { .. }
544            | Self::TypedCommandSummaryDecode { .. }
545            | Self::LeaseInvalidated { .. }
546            | Self::WorkerPoolExhausted { .. }
547            | Self::WorkerPoolMemoryBudgetExceeded { .. }
548            | Self::WorkerPoolQueueTimeout { .. }
549            | Self::UnsupportedRequest { .. } => None,
550        }
551    }
552}
553
554/// Supervisor for one `lean-rs-worker` child process.
555///
556/// Dropping a live supervisor attempts to terminate the child and then waits
557/// for it. Drop never panics; explicit `terminate` is preferred when callers
558/// need the exit status.
559#[derive(Debug)]
560pub struct LeanWorker {
561    config: LeanWorkerConfig,
562    child: Option<Child>,
563    stdin: Option<BufWriter<ChildStdin>>,
564    stdout: Option<BufReader<ChildStdout>>,
565    stderr: Option<ChildStderr>,
566    last_exit: Option<LeanWorkerExit>,
567    runtime_metadata: LeanWorkerRuntimeMetadata,
568    stats: LeanWorkerStats,
569    requests_since_restart: u64,
570    imports_since_restart: u64,
571    last_activity: Instant,
572}
573
574impl LeanWorker {
575    /// Spawn a worker child and wait for its protocol handshake.
576    ///
577    /// # Errors
578    ///
579    /// Returns `LeanWorkerError` if the child cannot be spawned, child setup
580    /// fails, the child exits before handshaking, or the startup timeout
581    /// expires.
582    pub fn spawn(config: &LeanWorkerConfig) -> Result<Self, LeanWorkerError> {
583        let mut command = Command::new(&config.executable);
584        command
585            .stdin(Stdio::piped())
586            .stdout(Stdio::piped())
587            .stderr(Stdio::piped())
588            .env("LEAN_ABORT_ON_PANIC", "1")
589            .env("LEAN_BACKTRACE", "0")
590            .env("RUST_BACKTRACE", "0");
591
592        if let Some(current_dir) = &config.current_dir {
593            command.current_dir(current_dir);
594        }
595        for (key, value) in &config.env {
596            command.env(key, value);
597        }
598
599        let mut child = command.spawn().map_err(|source| LeanWorkerError::Spawn {
600            executable: config.executable.clone(),
601            source,
602        })?;
603
604        let stdin = child
605            .stdin
606            .take()
607            .map(BufWriter::new)
608            .ok_or_else(|| LeanWorkerError::Setup {
609                message: "child stdin unavailable".to_owned(),
610            })?;
611        let stdout = child.stdout.take().ok_or_else(|| LeanWorkerError::Setup {
612            message: "child stdout unavailable".to_owned(),
613        })?;
614        let stderr = child.stderr.take();
615
616        let (sender, receiver) = mpsc::channel();
617        let _handshake_reader = thread::spawn(move || {
618            let mut stdout = BufReader::new(stdout);
619            let result = expect_handshake(&mut stdout);
620            drop(sender.send((stdout, result)));
621        });
622
623        let (stdout, runtime_metadata) = match receiver.recv_timeout(config.startup_timeout) {
624            Ok((stdout, Ok(metadata))) => (stdout, metadata),
625            Ok((_stdout, Err(err))) => {
626                let mut worker = Self {
627                    config: config.clone(),
628                    child: Some(child),
629                    stdin: Some(stdin),
630                    stdout: None,
631                    stderr,
632                    last_exit: None,
633                    runtime_metadata: LeanWorkerRuntimeMetadata {
634                        worker_version: String::new(),
635                        protocol_version: crate::protocol::PROTOCOL_VERSION,
636                        lean_version: None,
637                    },
638                    stats: LeanWorkerStats::default(),
639                    requests_since_restart: 0,
640                    imports_since_restart: 0,
641                    last_activity: Instant::now(),
642                };
643                let exit = worker.try_record_exit();
644                return Err(match exit {
645                    Some(exit) if !exit.success => LeanWorkerError::ChildPanicOrAbort { exit },
646                    Some(exit) => LeanWorkerError::ChildExited { exit },
647                    None => err,
648                });
649            }
650            Err(mpsc::RecvTimeoutError::Timeout) => {
651                drop(child.kill());
652                let _exit = wait_with_stderr(&mut child, stderr)?;
653                return Err(LeanWorkerError::Timeout {
654                    operation: "startup",
655                    duration: config.startup_timeout,
656                });
657            }
658            Err(mpsc::RecvTimeoutError::Disconnected) => {
659                return Err(LeanWorkerError::Handshake {
660                    message: "handshake reader exited without a result".to_owned(),
661                });
662            }
663        };
664
665        Ok(Self {
666            config: config.clone(),
667            child: Some(child),
668            stdin: Some(stdin),
669            stdout: Some(stdout),
670            stderr,
671            last_exit: None,
672            runtime_metadata,
673            stats: LeanWorkerStats::default(),
674            requests_since_restart: 0,
675            imports_since_restart: 0,
676            last_activity: Instant::now(),
677        })
678    }
679
680    /// Check whether the worker responds to requests.
681    ///
682    /// # Errors
683    ///
684    /// Returns `LeanWorkerError` if the worker is dead, the protocol fails, or
685    /// the child returns a typed worker error.
686    pub fn health(&mut self) -> Result<(), LeanWorkerError> {
687        self.prepare_request(false)?;
688        self.send_request(Request::Health)?;
689        self.record_request(false);
690        match self.read_response("health")? {
691            Response::HealthOk => Ok(()),
692            other @ (Response::CapabilityLoaded
693            | Response::U64 { .. }
694            | Response::HostSessionOpened
695            | Response::Elaboration { .. }
696            | Response::KernelCheck { .. }
697            | Response::Strings { .. }
698            | Response::StreamComplete { .. }
699            | Response::StreamExportFailed { .. }
700            | Response::StreamCallbackFailed { .. }
701            | Response::StreamRowMalformed { .. }
702            | Response::CapabilityMetadata { .. }
703            | Response::CapabilityDoctor { .. }
704            | Response::CapabilityMetadataMalformed { .. }
705            | Response::CapabilityDoctorMalformed { .. }
706            | Response::JsonCommand { .. }
707            | Response::RowsComplete { .. }
708            | Response::Terminating
709            | Response::Error { .. }) => Err(unexpected_response("health", &other)),
710        }
711    }
712
713    /// Load the in-tree fixture capability in the worker child.
714    ///
715    /// This is a fixture-only entry point used to exercise the supervisor path
716    /// in tests. The supported public path is `open_session`, which returns the
717    /// host-session adapter instead of expanding this fixture surface.
718    ///
719    /// # Errors
720    ///
721    /// Returns `LeanWorkerError` if the worker is dead, fixture loading fails,
722    /// or protocol communication fails.
723    pub fn load_fixture_capability(&mut self, fixture_root: impl AsRef<Path>) -> Result<(), LeanWorkerError> {
724        self.prepare_request(true)?;
725        self.send_request(Request::LoadFixtureCapability {
726            fixture_root: path_string(fixture_root.as_ref()),
727        })?;
728        self.record_request(true);
729        match self.read_response("load_fixture_capability")? {
730            Response::CapabilityLoaded => Ok(()),
731            other @ (Response::HealthOk
732            | Response::U64 { .. }
733            | Response::HostSessionOpened
734            | Response::Elaboration { .. }
735            | Response::KernelCheck { .. }
736            | Response::Strings { .. }
737            | Response::StreamComplete { .. }
738            | Response::StreamExportFailed { .. }
739            | Response::StreamCallbackFailed { .. }
740            | Response::StreamRowMalformed { .. }
741            | Response::CapabilityMetadata { .. }
742            | Response::CapabilityDoctor { .. }
743            | Response::CapabilityMetadataMalformed { .. }
744            | Response::CapabilityDoctorMalformed { .. }
745            | Response::JsonCommand { .. }
746            | Response::RowsComplete { .. }
747            | Response::Terminating
748            | Response::Error { .. }) => Err(unexpected_response("load_fixture_capability", &other)),
749        }
750    }
751
752    /// Call the prompt fixture multiplication export in the worker child.
753    ///
754    /// # Errors
755    ///
756    /// Returns `LeanWorkerError` if the worker is dead, the export fails, or
757    /// protocol communication fails.
758    pub fn call_fixture_mul(
759        &mut self,
760        fixture_root: impl AsRef<Path>,
761        lhs: u64,
762        rhs: u64,
763    ) -> Result<u64, LeanWorkerError> {
764        self.prepare_request(true)?;
765        self.send_request(Request::CallFixtureMul {
766            fixture_root: path_string(fixture_root.as_ref()),
767            lhs,
768            rhs,
769        })?;
770        self.record_request(true);
771        match self.read_response("call_fixture_mul")? {
772            Response::U64 { value } => Ok(value),
773            other @ (Response::HealthOk
774            | Response::CapabilityLoaded
775            | Response::HostSessionOpened
776            | Response::Elaboration { .. }
777            | Response::KernelCheck { .. }
778            | Response::Strings { .. }
779            | Response::StreamComplete { .. }
780            | Response::StreamExportFailed { .. }
781            | Response::StreamCallbackFailed { .. }
782            | Response::StreamRowMalformed { .. }
783            | Response::CapabilityMetadata { .. }
784            | Response::CapabilityDoctor { .. }
785            | Response::CapabilityMetadataMalformed { .. }
786            | Response::CapabilityDoctorMalformed { .. }
787            | Response::JsonCommand { .. }
788            | Response::RowsComplete { .. }
789            | Response::Terminating
790            | Response::Error { .. }) => Err(unexpected_response("call_fixture_mul", &other)),
791        }
792    }
793
794    /// Return the current worker lifecycle status.
795    ///
796    /// # Errors
797    ///
798    /// Returns `LeanWorkerError` if checking the process status fails.
799    pub fn status(&mut self) -> Result<LeanWorkerStatus, LeanWorkerError> {
800        if let Some(exit) = &self.last_exit {
801            return Ok(LeanWorkerStatus::Exited(exit.clone()));
802        }
803        let Some(child) = self.child.as_mut() else {
804            return Ok(LeanWorkerStatus::Exited(LeanWorkerExit {
805                success: false,
806                code: None,
807                status: "worker is not running".to_owned(),
808                diagnostics: String::new(),
809            }));
810        };
811        match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
812            Some(status) => {
813                let diagnostics = self.read_stderr();
814                let exit = LeanWorkerExit::from_status(status, diagnostics);
815                self.last_exit = Some(exit.clone());
816                self.child = None;
817                self.stdin = None;
818                self.stdout = None;
819                self.stats.exits = self.stats.exits.saturating_add(1);
820                Ok(LeanWorkerStatus::Exited(exit))
821            }
822            None => Ok(LeanWorkerStatus::Running),
823        }
824    }
825
826    /// Return lifecycle counters for this supervisor.
827    #[must_use]
828    pub fn stats(&self) -> LeanWorkerStats {
829        self.stats.clone()
830    }
831
832    /// Return protocol/runtime facts reported by the worker child.
833    #[must_use]
834    pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
835        self.runtime_metadata.clone()
836    }
837
838    /// Measure the current child RSS in KiB when supported by the platform.
839    ///
840    /// This is an observability hook for restart policy and memory-cycling
841    /// workloads. A `None` result means the platform did not provide a usable
842    /// sample; it is not a worker failure.
843    pub fn rss_kib(&mut self) -> Option<u64> {
844        match self.child_rss_kib() {
845            Some(value) => {
846                self.stats.last_rss_kib = Some(value);
847                Some(value)
848            }
849            None => {
850                self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
851                None
852            }
853        }
854    }
855
856    /// Return the timeout used for subsequent worker requests.
857    #[must_use]
858    pub fn request_timeout(&self) -> Duration {
859        self.config.request_timeout
860    }
861
862    /// Change the timeout for subsequent worker requests.
863    ///
864    /// This changes supervisor policy only. The supervisor still owns the
865    /// deadline, child kill, replacement, and restart accounting.
866    pub fn set_request_timeout(&mut self, timeout: Duration) {
867        self.config.request_timeout = timeout;
868    }
869
870    /// Explicitly cycle the worker process.
871    ///
872    /// This is the manual memory-reset operation. It terminates the current
873    /// child, starts a replacement with the original configuration, and records
874    /// `LeanWorkerRestartReason::Explicit`.
875    ///
876    /// # Errors
877    ///
878    /// Returns `LeanWorkerError` if the existing child cannot be waited on or
879    /// the replacement child cannot be spawned and handshaken.
880    pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
881        self.restart_with_reason(LeanWorkerRestartReason::Explicit)
882    }
883
884    pub(crate) fn cycle_with_restart_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
885        self.restart_with_reason(reason)
886    }
887
888    /// Restart this worker using its original configuration.
889    ///
890    /// This is an explicit lifecycle operation. Prompt 58 adds policy-driven
891    /// restarts for memory cycling; this method only gives callers a direct
892    /// reset point.
893    ///
894    /// # Errors
895    ///
896    /// Returns `LeanWorkerError` if the existing child cannot be waited on or
897    /// the replacement child cannot be spawned and handshaken.
898    pub fn restart(&mut self) -> Result<(), LeanWorkerError> {
899        self.cycle()
900    }
901
902    #[doc(hidden)]
903    /// Kill the child process for supervisor tests.
904    ///
905    /// # Errors
906    ///
907    /// Returns `LeanWorkerError` if the worker is already dead or the OS kill
908    /// request fails.
909    pub fn __kill_for_test(&mut self) -> Result<(), LeanWorkerError> {
910        let Some(child) = self.child.as_mut() else {
911            return Err(self.dead_error());
912        };
913        child.kill().map_err(|source| LeanWorkerError::Wait { source })?;
914        Ok(())
915    }
916
917    /// Ask the child to terminate cleanly and wait for it.
918    ///
919    /// # Errors
920    ///
921    /// Returns `LeanWorkerError` if the worker is already dead, the protocol
922    /// fails, or waiting for the child process fails.
923    pub fn terminate(mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
924        self.send_request(Request::Terminate)?;
925        match self.read_response("terminate")? {
926            Response::Terminating => self.wait_for_exit(),
927            other @ (Response::HealthOk
928            | Response::CapabilityLoaded
929            | Response::U64 { .. }
930            | Response::HostSessionOpened
931            | Response::Elaboration { .. }
932            | Response::KernelCheck { .. }
933            | Response::Strings { .. }
934            | Response::StreamComplete { .. }
935            | Response::StreamExportFailed { .. }
936            | Response::StreamCallbackFailed { .. }
937            | Response::StreamRowMalformed { .. }
938            | Response::CapabilityMetadata { .. }
939            | Response::CapabilityDoctor { .. }
940            | Response::CapabilityMetadataMalformed { .. }
941            | Response::CapabilityDoctorMalformed { .. }
942            | Response::JsonCommand { .. }
943            | Response::RowsComplete { .. }
944            | Response::Error { .. }) => Err(unexpected_response("terminate", &other)),
945        }
946    }
947
948    #[doc(hidden)]
949    /// Trigger the prompt fixture panic path.
950    ///
951    /// # Errors
952    ///
953    /// Returns `LeanWorkerError` if the worker does not exit fatally or if the
954    /// protocol fails before the panic path runs.
955    pub fn __trigger_lean_panic_fixture(
956        mut self,
957        fixture_root: impl AsRef<Path>,
958    ) -> Result<LeanWorkerExit, LeanWorkerError> {
959        self.prepare_request(true)?;
960        self.send_request(Request::TriggerLeanPanic {
961            fixture_root: path_string(fixture_root.as_ref()),
962        })?;
963        self.record_request(true);
964        match self.read_response("trigger_lean_panic") {
965            Ok(response) => Err(unexpected_response("trigger_lean_panic", &response)),
966            Err(LeanWorkerError::ChildPanicOrAbort { exit }) => Ok(exit),
967            Err(err) => Err(err),
968        }
969    }
970
971    #[doc(hidden)]
972    /// Emit synthetic worker data rows through the private protocol for row sink tests.
973    ///
974    /// # Errors
975    ///
976    /// Returns `LeanWorkerError` if the worker is dead, the sink panics,
977    /// cancellation is observed, or protocol communication fails.
978    pub fn __emit_test_rows(
979        &mut self,
980        streams: Vec<String>,
981        cancellation: Option<&LeanWorkerCancellationToken>,
982        data: Option<&dyn LeanWorkerDataSink>,
983    ) -> Result<u64, LeanWorkerError> {
984        const OPERATION: &str = "emit_test_rows";
985        check_cancelled(OPERATION, cancellation)?;
986        self.prepare_request(false)?;
987        self.send_request(Request::EmitTestRows { streams })?;
988        self.record_request(false);
989        match self.read_response_with_events(
990            OPERATION,
991            None,
992            cancellation,
993            data.map(LeanWorkerDataSinkTarget::Value),
994            None,
995        )? {
996            Response::RowsComplete { count } => Ok(count),
997            other @ (Response::HealthOk
998            | Response::CapabilityLoaded
999            | Response::U64 { .. }
1000            | Response::HostSessionOpened
1001            | Response::Elaboration { .. }
1002            | Response::KernelCheck { .. }
1003            | Response::Strings { .. }
1004            | Response::StreamComplete { .. }
1005            | Response::StreamExportFailed { .. }
1006            | Response::StreamCallbackFailed { .. }
1007            | Response::StreamRowMalformed { .. }
1008            | Response::CapabilityMetadata { .. }
1009            | Response::CapabilityDoctor { .. }
1010            | Response::CapabilityMetadataMalformed { .. }
1011            | Response::CapabilityDoctorMalformed { .. }
1012            | Response::JsonCommand { .. }
1013            | Response::Terminating
1014            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1015        }
1016    }
1017
1018    pub(crate) fn open_worker_session(
1019        &mut self,
1020        config: &LeanWorkerSessionConfig,
1021        cancellation: Option<&LeanWorkerCancellationToken>,
1022        progress: Option<&dyn LeanWorkerProgressSink>,
1023    ) -> Result<(), LeanWorkerError> {
1024        const OPERATION: &str = "open_worker_session";
1025        check_cancelled(OPERATION, cancellation)?;
1026        self.prepare_request(true)?;
1027        self.send_request(Request::OpenHostSession {
1028            project_root: config.project_root_string(),
1029            package: config.package().to_owned(),
1030            lib_name: config.lib_name().to_owned(),
1031            imports: config.imports().to_vec(),
1032        })?;
1033        self.record_request(true);
1034        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1035            Response::HostSessionOpened => Ok(()),
1036            other @ (Response::HealthOk
1037            | Response::CapabilityLoaded
1038            | Response::U64 { .. }
1039            | Response::Elaboration { .. }
1040            | Response::KernelCheck { .. }
1041            | Response::Strings { .. }
1042            | Response::StreamComplete { .. }
1043            | Response::StreamExportFailed { .. }
1044            | Response::StreamCallbackFailed { .. }
1045            | Response::StreamRowMalformed { .. }
1046            | Response::CapabilityMetadata { .. }
1047            | Response::CapabilityDoctor { .. }
1048            | Response::CapabilityMetadataMalformed { .. }
1049            | Response::CapabilityDoctorMalformed { .. }
1050            | Response::JsonCommand { .. }
1051            | Response::RowsComplete { .. }
1052            | Response::Terminating
1053            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1054        }
1055    }
1056
1057    pub(crate) fn worker_elaborate(
1058        &mut self,
1059        source: &str,
1060        options: &LeanWorkerElabOptions,
1061        cancellation: Option<&LeanWorkerCancellationToken>,
1062        progress: Option<&dyn LeanWorkerProgressSink>,
1063    ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
1064        const OPERATION: &str = "worker_elaborate";
1065        check_cancelled(OPERATION, cancellation)?;
1066        self.prepare_request(false)?;
1067        self.send_request(Request::Elaborate {
1068            source: source.to_owned(),
1069            options: options.wire(),
1070        })?;
1071        self.record_request(false);
1072        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1073            Response::Elaboration { outcome } => Ok(outcome.into()),
1074            other @ (Response::HealthOk
1075            | Response::CapabilityLoaded
1076            | Response::U64 { .. }
1077            | Response::HostSessionOpened
1078            | Response::KernelCheck { .. }
1079            | Response::Strings { .. }
1080            | Response::StreamComplete { .. }
1081            | Response::StreamExportFailed { .. }
1082            | Response::StreamCallbackFailed { .. }
1083            | Response::StreamRowMalformed { .. }
1084            | Response::CapabilityMetadata { .. }
1085            | Response::CapabilityDoctor { .. }
1086            | Response::CapabilityMetadataMalformed { .. }
1087            | Response::CapabilityDoctorMalformed { .. }
1088            | Response::JsonCommand { .. }
1089            | Response::RowsComplete { .. }
1090            | Response::Terminating
1091            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1092        }
1093    }
1094
1095    pub(crate) fn worker_kernel_check(
1096        &mut self,
1097        source: &str,
1098        options: &LeanWorkerElabOptions,
1099        cancellation: Option<&LeanWorkerCancellationToken>,
1100        progress: Option<&dyn LeanWorkerProgressSink>,
1101    ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
1102        const OPERATION: &str = "worker_kernel_check";
1103        check_cancelled(OPERATION, cancellation)?;
1104        self.prepare_request(false)?;
1105        self.send_request(Request::KernelCheck {
1106            source: source.to_owned(),
1107            options: options.wire(),
1108            progress: progress.is_some(),
1109        })?;
1110        self.record_request(false);
1111        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1112            Response::KernelCheck { outcome } => Ok(outcome.into()),
1113            other @ (Response::HealthOk
1114            | Response::CapabilityLoaded
1115            | Response::U64 { .. }
1116            | Response::HostSessionOpened
1117            | Response::Elaboration { .. }
1118            | Response::Strings { .. }
1119            | Response::StreamComplete { .. }
1120            | Response::StreamExportFailed { .. }
1121            | Response::StreamCallbackFailed { .. }
1122            | Response::StreamRowMalformed { .. }
1123            | Response::CapabilityMetadata { .. }
1124            | Response::CapabilityDoctor { .. }
1125            | Response::CapabilityMetadataMalformed { .. }
1126            | Response::CapabilityDoctorMalformed { .. }
1127            | Response::JsonCommand { .. }
1128            | Response::RowsComplete { .. }
1129            | Response::Terminating
1130            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1131        }
1132    }
1133
1134    pub(crate) fn worker_declaration_kinds(
1135        &mut self,
1136        names: &[&str],
1137        cancellation: Option<&LeanWorkerCancellationToken>,
1138        progress: Option<&dyn LeanWorkerProgressSink>,
1139    ) -> Result<Vec<String>, LeanWorkerError> {
1140        const OPERATION: &str = "worker_declaration_kinds";
1141        check_cancelled(OPERATION, cancellation)?;
1142        self.prepare_request(false)?;
1143        self.send_request(Request::DeclarationKinds {
1144            names: names.iter().map(|name| (*name).to_owned()).collect(),
1145            progress: progress.is_some(),
1146        })?;
1147        self.record_request(false);
1148        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1149            Response::Strings { values } => Ok(values),
1150            other @ (Response::HealthOk
1151            | Response::CapabilityLoaded
1152            | Response::U64 { .. }
1153            | Response::HostSessionOpened
1154            | Response::Elaboration { .. }
1155            | Response::KernelCheck { .. }
1156            | Response::StreamComplete { .. }
1157            | Response::StreamExportFailed { .. }
1158            | Response::StreamCallbackFailed { .. }
1159            | Response::StreamRowMalformed { .. }
1160            | Response::CapabilityMetadata { .. }
1161            | Response::CapabilityDoctor { .. }
1162            | Response::CapabilityMetadataMalformed { .. }
1163            | Response::CapabilityDoctorMalformed { .. }
1164            | Response::JsonCommand { .. }
1165            | Response::RowsComplete { .. }
1166            | Response::Terminating
1167            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1168        }
1169    }
1170
1171    pub(crate) fn worker_declaration_names(
1172        &mut self,
1173        names: &[&str],
1174        cancellation: Option<&LeanWorkerCancellationToken>,
1175        progress: Option<&dyn LeanWorkerProgressSink>,
1176    ) -> Result<Vec<String>, LeanWorkerError> {
1177        const OPERATION: &str = "worker_declaration_names";
1178        check_cancelled(OPERATION, cancellation)?;
1179        self.prepare_request(false)?;
1180        self.send_request(Request::DeclarationNames {
1181            names: names.iter().map(|name| (*name).to_owned()).collect(),
1182            progress: progress.is_some(),
1183        })?;
1184        self.record_request(false);
1185        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1186            Response::Strings { values } => Ok(values),
1187            other @ (Response::HealthOk
1188            | Response::CapabilityLoaded
1189            | Response::U64 { .. }
1190            | Response::HostSessionOpened
1191            | Response::Elaboration { .. }
1192            | Response::KernelCheck { .. }
1193            | Response::StreamComplete { .. }
1194            | Response::StreamExportFailed { .. }
1195            | Response::StreamCallbackFailed { .. }
1196            | Response::StreamRowMalformed { .. }
1197            | Response::CapabilityMetadata { .. }
1198            | Response::CapabilityDoctor { .. }
1199            | Response::CapabilityMetadataMalformed { .. }
1200            | Response::CapabilityDoctorMalformed { .. }
1201            | Response::JsonCommand { .. }
1202            | Response::RowsComplete { .. }
1203            | Response::Terminating
1204            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1205        }
1206    }
1207
1208    pub(crate) fn worker_run_data_stream(
1209        &mut self,
1210        export: &str,
1211        request: &serde_json::Value,
1212        rows: &dyn LeanWorkerDataSink,
1213        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1214        cancellation: Option<&LeanWorkerCancellationToken>,
1215        progress: Option<&dyn LeanWorkerProgressSink>,
1216    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1217        self.worker_run_data_stream_with_sink(
1218            export,
1219            request,
1220            LeanWorkerDataSinkTarget::Value(rows),
1221            diagnostics,
1222            cancellation,
1223            progress,
1224        )
1225    }
1226
1227    pub(crate) fn worker_run_data_stream_raw(
1228        &mut self,
1229        export: &str,
1230        request: &serde_json::Value,
1231        rows: &dyn LeanWorkerRawDataSink,
1232        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1233        cancellation: Option<&LeanWorkerCancellationToken>,
1234        progress: Option<&dyn LeanWorkerProgressSink>,
1235    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1236        self.worker_run_data_stream_with_sink(
1237            export,
1238            request,
1239            LeanWorkerDataSinkTarget::Raw(rows),
1240            diagnostics,
1241            cancellation,
1242            progress,
1243        )
1244    }
1245
1246    fn worker_run_data_stream_with_sink(
1247        &mut self,
1248        export: &str,
1249        request: &serde_json::Value,
1250        rows: LeanWorkerDataSinkTarget<'_>,
1251        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1252        cancellation: Option<&LeanWorkerCancellationToken>,
1253        progress: Option<&dyn LeanWorkerProgressSink>,
1254    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1255        const OPERATION: &str = "worker_run_data_stream";
1256        check_cancelled(OPERATION, cancellation)?;
1257        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1258            message: format!("worker data-stream request JSON encode failed: {err}"),
1259        })?;
1260        self.prepare_request(false)?;
1261        self.send_request(Request::RunDataStream {
1262            export: export.to_owned(),
1263            request_json,
1264            progress: progress.is_some(),
1265        })?;
1266        self.record_request(false);
1267        self.stats.stream_requests = self.stats.stream_requests.saturating_add(1);
1268        match self.read_response_with_events(OPERATION, progress, cancellation, Some(rows), diagnostics)? {
1269            Response::StreamComplete { summary } => Ok(summary.into()),
1270            Response::StreamExportFailed { status_byte } => {
1271                Err(LeanWorkerError::StreamExportFailed { status: status_byte })
1272            }
1273            Response::StreamCallbackFailed {
1274                status_byte,
1275                description,
1276            } => Err(LeanWorkerError::StreamCallbackFailed {
1277                status: status_byte,
1278                description,
1279            }),
1280            Response::StreamRowMalformed { message } => Err(LeanWorkerError::StreamRowMalformed { message }),
1281            other @ (Response::HealthOk
1282            | Response::CapabilityLoaded
1283            | Response::U64 { .. }
1284            | Response::HostSessionOpened
1285            | Response::Elaboration { .. }
1286            | Response::KernelCheck { .. }
1287            | Response::Strings { .. }
1288            | Response::RowsComplete { .. }
1289            | Response::CapabilityMetadata { .. }
1290            | Response::CapabilityDoctor { .. }
1291            | Response::CapabilityMetadataMalformed { .. }
1292            | Response::CapabilityDoctorMalformed { .. }
1293            | Response::JsonCommand { .. }
1294            | Response::Terminating
1295            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1296        }
1297    }
1298
1299    pub(crate) fn worker_capability_metadata(
1300        &mut self,
1301        export: &str,
1302        request: &serde_json::Value,
1303        cancellation: Option<&LeanWorkerCancellationToken>,
1304        progress: Option<&dyn LeanWorkerProgressSink>,
1305    ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
1306        const OPERATION: &str = "worker_capability_metadata";
1307        check_cancelled(OPERATION, cancellation)?;
1308        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1309            message: format!("worker capability metadata request JSON encode failed: {err}"),
1310        })?;
1311        self.prepare_request(false)?;
1312        self.send_request(Request::CapabilityMetadata {
1313            export: export.to_owned(),
1314            request_json,
1315        })?;
1316        self.record_request(false);
1317        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1318            Response::CapabilityMetadata { metadata } => Ok(metadata.into()),
1319            Response::CapabilityMetadataMalformed { message } => {
1320                Err(LeanWorkerError::CapabilityMetadataMalformed { message })
1321            }
1322            other @ (Response::HealthOk
1323            | Response::CapabilityLoaded
1324            | Response::U64 { .. }
1325            | Response::HostSessionOpened
1326            | Response::Elaboration { .. }
1327            | Response::KernelCheck { .. }
1328            | Response::Strings { .. }
1329            | Response::StreamComplete { .. }
1330            | Response::StreamExportFailed { .. }
1331            | Response::StreamCallbackFailed { .. }
1332            | Response::StreamRowMalformed { .. }
1333            | Response::CapabilityDoctor { .. }
1334            | Response::CapabilityDoctorMalformed { .. }
1335            | Response::JsonCommand { .. }
1336            | Response::RowsComplete { .. }
1337            | Response::Terminating
1338            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1339        }
1340    }
1341
1342    pub(crate) fn worker_capability_doctor(
1343        &mut self,
1344        export: &str,
1345        request: &serde_json::Value,
1346        cancellation: Option<&LeanWorkerCancellationToken>,
1347        progress: Option<&dyn LeanWorkerProgressSink>,
1348    ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
1349        const OPERATION: &str = "worker_capability_doctor";
1350        check_cancelled(OPERATION, cancellation)?;
1351        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1352            message: format!("worker capability doctor request JSON encode failed: {err}"),
1353        })?;
1354        self.prepare_request(false)?;
1355        self.send_request(Request::CapabilityDoctor {
1356            export: export.to_owned(),
1357            request_json,
1358        })?;
1359        self.record_request(false);
1360        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1361            Response::CapabilityDoctor { report } => Ok(report.into()),
1362            Response::CapabilityDoctorMalformed { message } => {
1363                Err(LeanWorkerError::CapabilityDoctorMalformed { message })
1364            }
1365            other @ (Response::HealthOk
1366            | Response::CapabilityLoaded
1367            | Response::U64 { .. }
1368            | Response::HostSessionOpened
1369            | Response::Elaboration { .. }
1370            | Response::KernelCheck { .. }
1371            | Response::Strings { .. }
1372            | Response::StreamComplete { .. }
1373            | Response::StreamExportFailed { .. }
1374            | Response::StreamCallbackFailed { .. }
1375            | Response::StreamRowMalformed { .. }
1376            | Response::CapabilityMetadata { .. }
1377            | Response::CapabilityMetadataMalformed { .. }
1378            | Response::JsonCommand { .. }
1379            | Response::RowsComplete { .. }
1380            | Response::Terminating
1381            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1382        }
1383    }
1384
1385    pub(crate) fn worker_json_command(
1386        &mut self,
1387        export: &str,
1388        request_json: String,
1389        cancellation: Option<&LeanWorkerCancellationToken>,
1390        progress: Option<&dyn LeanWorkerProgressSink>,
1391    ) -> Result<String, LeanWorkerError> {
1392        const OPERATION: &str = "worker_json_command";
1393        check_cancelled(OPERATION, cancellation)?;
1394        self.prepare_request(false)?;
1395        self.send_request(Request::JsonCommand {
1396            export: export.to_owned(),
1397            request_json,
1398        })?;
1399        self.record_request(false);
1400        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1401            Response::JsonCommand { response_json } => Ok(response_json),
1402            other @ (Response::HealthOk
1403            | Response::CapabilityLoaded
1404            | Response::U64 { .. }
1405            | Response::HostSessionOpened
1406            | Response::Elaboration { .. }
1407            | Response::KernelCheck { .. }
1408            | Response::Strings { .. }
1409            | Response::StreamComplete { .. }
1410            | Response::StreamExportFailed { .. }
1411            | Response::StreamCallbackFailed { .. }
1412            | Response::StreamRowMalformed { .. }
1413            | Response::CapabilityMetadata { .. }
1414            | Response::CapabilityDoctor { .. }
1415            | Response::CapabilityMetadataMalformed { .. }
1416            | Response::CapabilityDoctorMalformed { .. }
1417            | Response::RowsComplete { .. }
1418            | Response::Terminating
1419            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1420        }
1421    }
1422
1423    fn send_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
1424        self.ensure_running()?;
1425        let Some(stdin) = self.stdin.as_mut() else {
1426            return Err(self.dead_error());
1427        };
1428        write_frame(stdin, Message::Request(request)).map_err(|err| LeanWorkerError::Protocol {
1429            message: err.to_string(),
1430        })
1431    }
1432
1433    fn prepare_request(&mut self, import_like: bool) -> Result<(), LeanWorkerError> {
1434        self.ensure_running()?;
1435
1436        if let Some(limit) = self.config.restart_policy.max_requests
1437            && self.requests_since_restart >= limit
1438        {
1439            return self.restart_with_reason(LeanWorkerRestartReason::MaxRequests { limit });
1440        }
1441
1442        if import_like
1443            && let Some(limit) = self.config.restart_policy.max_imports
1444            && self.imports_since_restart >= limit
1445        {
1446            return self.restart_with_reason(LeanWorkerRestartReason::MaxImports { limit });
1447        }
1448
1449        if let Some(limit_kib) = self.config.restart_policy.max_rss_kib {
1450            match self.child_rss_kib() {
1451                Some(current_kib) if current_kib >= limit_kib => {
1452                    self.stats.last_rss_kib = Some(current_kib);
1453                    return self.restart_with_reason(LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib });
1454                }
1455                Some(current_kib) => {
1456                    self.stats.last_rss_kib = Some(current_kib);
1457                }
1458                None => {
1459                    self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
1460                }
1461            }
1462        }
1463
1464        if let Some(limit) = self.config.restart_policy.idle_restart_after {
1465            let idle_for = self.last_activity.elapsed();
1466            if idle_for >= limit {
1467                return self.restart_with_reason(LeanWorkerRestartReason::Idle { idle_for, limit });
1468            }
1469        }
1470
1471        Ok(())
1472    }
1473
1474    fn record_request(&mut self, import_like: bool) {
1475        self.stats.requests = self.stats.requests.saturating_add(1);
1476        self.requests_since_restart = self.requests_since_restart.saturating_add(1);
1477        if import_like {
1478            self.stats.imports = self.stats.imports.saturating_add(1);
1479            self.imports_since_restart = self.imports_since_restart.saturating_add(1);
1480        }
1481        self.last_activity = Instant::now();
1482    }
1483
1484    fn restart_with_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
1485        let config = self.config.clone();
1486        self.stop_existing_child()?;
1487        self.stats.record_restart(reason);
1488        self.requests_since_restart = 0;
1489        self.imports_since_restart = 0;
1490        let mut next = Self::spawn(&config)?;
1491        next.stats = self.stats.clone();
1492        next.last_activity = Instant::now();
1493        *self = next;
1494        Ok(())
1495    }
1496
1497    fn read_response(&mut self, operation: &'static str) -> Result<Response, LeanWorkerError> {
1498        self.read_response_with_events(operation, None, None, None, None)
1499    }
1500
1501    fn read_response_with_progress(
1502        &mut self,
1503        operation: &'static str,
1504        progress: Option<&dyn LeanWorkerProgressSink>,
1505        cancellation: Option<&LeanWorkerCancellationToken>,
1506    ) -> Result<Response, LeanWorkerError> {
1507        self.read_response_with_events(operation, progress, cancellation, None, None)
1508    }
1509
1510    fn read_response_with_events(
1511        &mut self,
1512        operation: &'static str,
1513        progress: Option<&dyn LeanWorkerProgressSink>,
1514        cancellation: Option<&LeanWorkerCancellationToken>,
1515        data: Option<LeanWorkerDataSinkTarget<'_>>,
1516        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1517    ) -> Result<Response, LeanWorkerError> {
1518        let started = Instant::now();
1519        let timeout = self.config.request_timeout;
1520        let deadline = started.checked_add(timeout);
1521        let streaming = data.is_some();
1522        let mut request_backpressure_waits = 0_u64;
1523        let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
1524        let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
1525        let _reader = thread::spawn(move || read_request_messages(stdout, sender));
1526
1527        loop {
1528            let event = match deadline.and_then(|deadline| deadline.checked_duration_since(Instant::now())) {
1529                Some(remaining) if remaining.is_zero() => {
1530                    if streaming {
1531                        self.record_stream_failure(started, request_backpressure_waits);
1532                    }
1533                    self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1534                        operation,
1535                        duration: timeout,
1536                    })?;
1537                    return Err(LeanWorkerError::Timeout {
1538                        operation,
1539                        duration: timeout,
1540                    });
1541                }
1542                Some(remaining) => match receiver.recv_timeout(remaining) {
1543                    Ok(event) => event,
1544                    Err(mpsc::RecvTimeoutError::Timeout) => {
1545                        if streaming {
1546                            self.record_stream_failure(started, request_backpressure_waits);
1547                        }
1548                        self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1549                            operation,
1550                            duration: timeout,
1551                        })?;
1552                        return Err(LeanWorkerError::Timeout {
1553                            operation,
1554                            duration: timeout,
1555                        });
1556                    }
1557                    Err(mpsc::RecvTimeoutError::Disconnected) => {
1558                        return Err(LeanWorkerError::Protocol {
1559                            message: "worker response reader exited without a terminal response".to_owned(),
1560                        });
1561                    }
1562                },
1563                None => match receiver.recv() {
1564                    Ok(event) => event,
1565                    Err(_err) => {
1566                        return Err(LeanWorkerError::Protocol {
1567                            message: "worker response reader exited without a terminal response".to_owned(),
1568                        });
1569                    }
1570                },
1571            };
1572            request_backpressure_waits = request_backpressure_waits.saturating_add(event.backpressure_waits());
1573            self.stats.backpressure_waits = self.stats.backpressure_waits.saturating_add(event.backpressure_waits());
1574
1575            let message = match event {
1576                RequestReaderEvent::Message { message, .. } => message,
1577                RequestReaderEvent::Terminal { message, stdout, .. } => {
1578                    self.stdout = Some(stdout);
1579                    match message {
1580                        Message::Response(Response::Error { code, message }) => {
1581                            if streaming {
1582                                self.record_stream_failure(started, request_backpressure_waits);
1583                            }
1584                            return Err(LeanWorkerError::Worker { code, message });
1585                        }
1586                        Message::Response(response) => {
1587                            if streaming {
1588                                if matches!(response, Response::StreamComplete { .. }) {
1589                                    self.record_stream_success(started);
1590                                } else {
1591                                    self.record_stream_failure(started, request_backpressure_waits);
1592                                }
1593                            }
1594                            return Ok(response);
1595                        }
1596                        other @ (Message::Handshake { .. }
1597                        | Message::Request(_)
1598                        | Message::Diagnostic(_)
1599                        | Message::ProgressTick(_)
1600                        | Message::DataRow(_)
1601                        | Message::FatalExit(_)) => {
1602                            return Err(LeanWorkerError::Protocol {
1603                                message: format!("worker sent unexpected {operation} message: {other:?}"),
1604                            });
1605                        }
1606                    }
1607                }
1608                RequestReaderEvent::ReadError { message, eof, .. } => {
1609                    if streaming {
1610                        self.record_stream_failure(started, request_backpressure_waits);
1611                    }
1612                    return if eof {
1613                        Err(self.record_exit_error())
1614                    } else {
1615                        Err(LeanWorkerError::Protocol { message })
1616                    };
1617                }
1618            };
1619
1620            match message {
1621                Message::ProgressTick(tick) => {
1622                    if let Err(err) =
1623                        report_parent_progress(progress, elapsed_event(tick.phase, tick.current, tick.total, started))
1624                    {
1625                        if streaming {
1626                            self.record_stream_failure(started, request_backpressure_waits);
1627                        }
1628                        return Err(err);
1629                    }
1630                    if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1631                        if streaming {
1632                            self.record_stream_failure(started, request_backpressure_waits);
1633                        }
1634                        self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1635                        return Err(LeanWorkerError::Cancelled { operation });
1636                    }
1637                }
1638                Message::DataRow(row) => {
1639                    let payload_bytes = row.payload.get().len() as u64;
1640                    if let Err(err) = report_parent_data_row(data, row) {
1641                        if streaming {
1642                            self.record_stream_failure(started, request_backpressure_waits);
1643                        }
1644                        return Err(err);
1645                    }
1646                    self.stats.data_rows_delivered = self.stats.data_rows_delivered.saturating_add(1);
1647                    self.stats.data_row_payload_bytes = self.stats.data_row_payload_bytes.saturating_add(payload_bytes);
1648                    if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1649                        if streaming {
1650                            self.record_stream_failure(started, request_backpressure_waits);
1651                        }
1652                        self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1653                        return Err(LeanWorkerError::Cancelled { operation });
1654                    }
1655                }
1656                Message::Diagnostic(diagnostic) => {
1657                    if let Err(err) = report_parent_diagnostic(diagnostics, diagnostic.into()) {
1658                        if streaming {
1659                            self.record_stream_failure(started, request_backpressure_waits);
1660                        }
1661                        return Err(err);
1662                    }
1663                }
1664                Message::Response(response) => return Err(unexpected_response(operation, &response)),
1665                other @ (Message::Handshake { .. } | Message::Request(_) | Message::FatalExit(_)) => {
1666                    return Err(LeanWorkerError::Protocol {
1667                        message: format!("worker sent unexpected {operation} message: {other:?}"),
1668                    });
1669                }
1670            }
1671        }
1672    }
1673
1674    fn ensure_running(&mut self) -> Result<(), LeanWorkerError> {
1675        match self.status()? {
1676            LeanWorkerStatus::Running => Ok(()),
1677            LeanWorkerStatus::Exited(exit) if exit.success => Err(LeanWorkerError::ChildExited { exit }),
1678            LeanWorkerStatus::Exited(exit) => Err(LeanWorkerError::ChildPanicOrAbort { exit }),
1679        }
1680    }
1681
1682    fn record_stream_success(&mut self, started: Instant) {
1683        self.stats.stream_successes = self.stats.stream_successes.saturating_add(1);
1684        self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1685    }
1686
1687    fn record_stream_failure(&mut self, started: Instant, backpressure_waits: u64) {
1688        self.stats.stream_failures = self.stats.stream_failures.saturating_add(1);
1689        self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1690        if backpressure_waits > 0 {
1691            self.stats.backpressure_failures = self.stats.backpressure_failures.saturating_add(1);
1692        }
1693    }
1694
1695    fn wait_for_exit(&mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
1696        let Some(child) = self.child.as_mut() else {
1697            return Err(self.dead_error());
1698        };
1699        let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1700        let diagnostics = self.read_stderr();
1701        let exit = LeanWorkerExit::from_status(status, diagnostics);
1702        self.last_exit = Some(exit.clone());
1703        self.child = None;
1704        self.stdin = None;
1705        self.stdout = None;
1706        self.stats.exits = self.stats.exits.saturating_add(1);
1707        Ok(exit)
1708    }
1709
1710    fn try_record_exit(&mut self) -> Option<LeanWorkerExit> {
1711        let child = self.child.as_mut()?;
1712        let status = child.try_wait().ok().flatten()?;
1713        let diagnostics = self.read_stderr();
1714        let exit = LeanWorkerExit::from_status(status, diagnostics);
1715        self.last_exit = Some(exit.clone());
1716        self.child = None;
1717        self.stdin = None;
1718        self.stdout = None;
1719        self.stats.exits = self.stats.exits.saturating_add(1);
1720        Some(exit)
1721    }
1722
1723    fn record_exit_error(&mut self) -> LeanWorkerError {
1724        match self.wait_for_exit() {
1725            Ok(exit) if exit.success => LeanWorkerError::ChildExited { exit },
1726            Ok(exit) => LeanWorkerError::ChildPanicOrAbort { exit },
1727            Err(err) => err,
1728        }
1729    }
1730
1731    fn stop_existing_child(&mut self) -> Result<(), LeanWorkerError> {
1732        if let Some(child) = self.child.as_mut() {
1733            drop(child.kill());
1734            let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1735            let diagnostics = self.read_stderr();
1736            self.last_exit = Some(LeanWorkerExit::from_status(status, diagnostics));
1737            self.stats.exits = self.stats.exits.saturating_add(1);
1738        }
1739        self.child = None;
1740        self.stdin = None;
1741        self.stdout = None;
1742        Ok(())
1743    }
1744
1745    fn dead_error(&self) -> LeanWorkerError {
1746        let exit = self.last_exit.clone().unwrap_or_else(|| LeanWorkerExit {
1747            success: false,
1748            code: None,
1749            status: "worker is not running".to_owned(),
1750            diagnostics: String::new(),
1751        });
1752        if exit.success {
1753            LeanWorkerError::ChildExited { exit }
1754        } else {
1755            LeanWorkerError::ChildPanicOrAbort { exit }
1756        }
1757    }
1758
1759    fn read_stderr(&mut self) -> String {
1760        let mut diagnostics = String::new();
1761        if let Some(mut pipe) = self.stderr.take() {
1762            drop(pipe.read_to_string(&mut diagnostics));
1763        }
1764        diagnostics
1765    }
1766
1767    fn child_rss_kib(&mut self) -> Option<u64> {
1768        let child = self.child.as_mut()?;
1769        child_rss_kib(child.id())
1770    }
1771}
1772
1773enum RequestReaderEvent {
1774    Message {
1775        message: Message,
1776        backpressure_waits: u64,
1777    },
1778    Terminal {
1779        message: Message,
1780        stdout: BufReader<ChildStdout>,
1781        backpressure_waits: u64,
1782    },
1783    ReadError {
1784        message: String,
1785        eof: bool,
1786        backpressure_waits: u64,
1787    },
1788}
1789
1790impl RequestReaderEvent {
1791    fn backpressure_waits(&self) -> u64 {
1792        match self {
1793            Self::Message { backpressure_waits, .. }
1794            | Self::Terminal { backpressure_waits, .. }
1795            | Self::ReadError { backpressure_waits, .. } => *backpressure_waits,
1796        }
1797    }
1798
1799    fn add_backpressure_wait(&mut self) {
1800        match self {
1801            Self::Message { backpressure_waits, .. }
1802            | Self::Terminal { backpressure_waits, .. }
1803            | Self::ReadError { backpressure_waits, .. } => {
1804                *backpressure_waits = backpressure_waits.saturating_add(1);
1805            }
1806        }
1807    }
1808}
1809
1810#[allow(
1811    clippy::needless_pass_by_value,
1812    reason = "the request reader thread must own the sender"
1813)]
1814fn read_request_messages(mut stdout: BufReader<ChildStdout>, sender: mpsc::SyncSender<RequestReaderEvent>) {
1815    loop {
1816        match read_frame(&mut stdout) {
1817            Ok(frame) if matches!(frame.message, Message::Response(_)) => {
1818                let _ = send_reader_event(
1819                    &sender,
1820                    RequestReaderEvent::Terminal {
1821                        message: frame.message,
1822                        stdout,
1823                        backpressure_waits: 0,
1824                    },
1825                );
1826                return;
1827            }
1828            Ok(frame) => {
1829                if send_reader_event(
1830                    &sender,
1831                    RequestReaderEvent::Message {
1832                        message: frame.message,
1833                        backpressure_waits: 0,
1834                    },
1835                )
1836                .is_err()
1837                {
1838                    return;
1839                }
1840            }
1841            Err(err) => {
1842                let _ = send_reader_event(
1843                    &sender,
1844                    RequestReaderEvent::ReadError {
1845                        message: err.to_string(),
1846                        eof: err.is_eof(),
1847                        backpressure_waits: 0,
1848                    },
1849                );
1850                return;
1851            }
1852        }
1853    }
1854}
1855
1856fn send_reader_event(sender: &mpsc::SyncSender<RequestReaderEvent>, event: RequestReaderEvent) -> Result<(), ()> {
1857    match sender.try_send(event) {
1858        Ok(()) => Ok(()),
1859        Err(mpsc::TrySendError::Full(mut event)) => {
1860            event.add_backpressure_wait();
1861            sender.send(event).map_err(|_| ())
1862        }
1863        Err(mpsc::TrySendError::Disconnected(_event)) => Err(()),
1864    }
1865}
1866
1867impl Drop for LeanWorker {
1868    fn drop(&mut self) {
1869        if let Some(child) = self.child.as_mut() {
1870            drop(child.kill());
1871            drop(child.wait());
1872        }
1873    }
1874}
1875
1876fn expect_handshake(stdout: &mut BufReader<ChildStdout>) -> Result<LeanWorkerRuntimeMetadata, LeanWorkerError> {
1877    let frame = read_frame(stdout).map_err(|err| {
1878        if err.is_eof() {
1879            LeanWorkerError::Handshake {
1880                message: "child closed stdout before handshake".to_owned(),
1881            }
1882        } else {
1883            LeanWorkerError::Handshake {
1884                message: err.to_string(),
1885            }
1886        }
1887    })?;
1888    match frame.message {
1889        Message::Handshake {
1890            worker_version,
1891            protocol_version,
1892        } if protocol_version == crate::protocol::PROTOCOL_VERSION => Ok(LeanWorkerRuntimeMetadata {
1893            worker_version,
1894            protocol_version,
1895            lean_version: None,
1896        }),
1897        other @ (Message::Handshake { .. }
1898        | Message::Request(_)
1899        | Message::Response(_)
1900        | Message::Diagnostic(_)
1901        | Message::ProgressTick(_)
1902        | Message::DataRow(_)
1903        | Message::FatalExit(_)) => Err(LeanWorkerError::Handshake {
1904            message: format!("unexpected handshake frame: {other:?}"),
1905        }),
1906    }
1907}
1908
1909fn wait_with_stderr(child: &mut Child, stderr: Option<ChildStderr>) -> Result<LeanWorkerExit, LeanWorkerError> {
1910    let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1911    let mut diagnostics = String::new();
1912    if let Some(mut pipe) = stderr {
1913        drop(pipe.read_to_string(&mut diagnostics));
1914    }
1915    Ok(LeanWorkerExit::from_status(status, diagnostics))
1916}
1917
1918fn unexpected_response(operation: &'static str, response: &Response) -> LeanWorkerError {
1919    LeanWorkerError::Protocol {
1920        message: format!("worker sent unexpected {operation} response: {response:?}"),
1921    }
1922}
1923
1924fn path_string(path: &Path) -> String {
1925    path.to_string_lossy().into_owned()
1926}
1927
1928#[cfg(target_os = "linux")]
1929fn child_rss_kib(pid: u32) -> Option<u64> {
1930    let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
1931    status.lines().find_map(|line| {
1932        let rest = line.strip_prefix("VmRSS:")?;
1933        rest.split_whitespace().next()?.parse::<u64>().ok()
1934    })
1935}
1936
1937#[cfg(not(target_os = "linux"))]
1938fn child_rss_kib(pid: u32) -> Option<u64> {
1939    let output = Command::new("ps")
1940        .args(["-o", "rss=", "-p", &pid.to_string()])
1941        .output()
1942        .ok()?;
1943    if !output.status.success() {
1944        return None;
1945    }
1946    let text = String::from_utf8_lossy(&output.stdout);
1947    text.trim().parse::<u64>().ok().filter(|value| *value > 0)
1948}