Skip to main content

lean_rs_worker/
supervisor.rs

1use std::ffi::OsString;
2use std::fmt;
3use std::io::{BufReader, BufWriter, Read as _};
4use std::path::{Path, PathBuf};
5use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
6use std::sync::mpsc;
7use std::thread;
8use std::time::{Duration, Instant};
9
10use std::sync::Mutex;
11
12use crate::capability::LeanWorkerBootstrapDiagnosticCode;
13use crate::protocol::{Message, Request, Response, read_frame, write_frame};
14use crate::session::LeanWorkerDataSinkTarget;
15use crate::session::{
16    LeanWorkerCancellationToken, LeanWorkerDataSink, LeanWorkerDiagnosticSink, LeanWorkerProgressSink,
17    LeanWorkerRawDataRow, LeanWorkerRawDataSink, LeanWorkerRuntimeMetadata, LeanWorkerSessionConfig,
18    LeanWorkerStreamSummary, check_cancelled, elapsed_event, report_parent_data_row, report_parent_diagnostic,
19    report_parent_progress,
20};
21use crate::types::{
22    LeanWorkerCapabilityMetadata, LeanWorkerDeclarationFilter, LeanWorkerDeclarationRow, LeanWorkerDoctorReport,
23    LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerKernelResult, LeanWorkerMetaResult,
24    LeanWorkerMetaTransparency, LeanWorkerProcessFileOutcome, LeanWorkerProcessModuleOutcome, LeanWorkerRendered,
25};
26
27const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
28const WORKER_EVENT_BUFFER_CAPACITY: usize = 64;
29
30/// Default deadline for one worker request after startup.
31pub const LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(30);
32
33/// Suggested deadline for long-running worker requests.
34pub const LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING: Duration = Duration::from_mins(10);
35
36/// Configuration for starting a `lean-rs-worker` child process.
37///
38/// The executable should be the `lean-rs-worker-child` binary.
39///
40/// **Worker-child panic policy.** The supervisor spawns every child with two
41/// defaults that together pin a process boundary around Lean panics:
42///
43/// - `LEAN_ABORT_ON_PANIC=1` — Lean internal panics terminate the child
44///   instead of returning default values, so the parent observes a fatal
45///   exit rather than silently-corrupted state.
46/// - `LEAN_BACKTRACE=0` — Lean's panic-time backtrace handler is skipped.
47///   Since Lean 4.30 that handler calls back into Lean code (the demangler
48///   is now `@[export]`'d from `Lean.Compiler.NameDemangling`); a worker
49///   child embeds a minimal Lean and cannot guarantee that callback's
50///   transitive module dependencies are initialized when user code panics.
51///   Disabling the backtrace removes that dependency entirely. See
52///   `docs/architecture/06-panic-containment.md` for the boundary argument.
53///
54/// Both are safe defaults — explicit `.env()` entries supplied here override
55/// them, in case a caller knows the dependency is satisfied and wants a
56/// demangled backtrace on the child's stderr.
57#[derive(Clone, Debug)]
58pub struct LeanWorkerConfig {
59    executable: PathBuf,
60    current_dir: Option<PathBuf>,
61    env: Vec<(OsString, OsString)>,
62    startup_timeout: Duration,
63    request_timeout: Duration,
64    restart_policy: LeanWorkerRestartPolicy,
65}
66
67impl LeanWorkerConfig {
68    /// Create a worker configuration for a child executable.
69    pub fn new(executable: impl Into<PathBuf>) -> Self {
70        Self {
71            executable: executable.into(),
72            current_dir: None,
73            env: Vec::new(),
74            startup_timeout: DEFAULT_STARTUP_TIMEOUT,
75            request_timeout: LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT,
76            restart_policy: LeanWorkerRestartPolicy::default(),
77        }
78    }
79
80    /// Return the child executable path.
81    pub fn executable(&self) -> &Path {
82        &self.executable
83    }
84
85    /// Set the child working directory.
86    #[must_use]
87    pub fn current_dir(mut self, path: impl Into<PathBuf>) -> Self {
88        self.current_dir = Some(path.into());
89        self
90    }
91
92    /// Add or override one child environment variable.
93    #[must_use]
94    pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
95        self.env.push((key.into(), value.into()));
96        self
97    }
98
99    /// Set the maximum time to wait for the child handshake.
100    #[must_use]
101    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
102        self.startup_timeout = timeout;
103        self
104    }
105
106    /// Set the maximum time to wait for one request's terminal response.
107    ///
108    /// The request timeout starts after the request frame is written. It covers
109    /// live rows, diagnostics, progress events, and the terminal response. On
110    /// timeout, the supervisor kills and replaces the child process.
111    #[must_use]
112    pub fn request_timeout(mut self, timeout: Duration) -> Self {
113        self.request_timeout = timeout;
114        self
115    }
116
117    /// Use the documented long-running request timeout profile.
118    #[must_use]
119    pub fn long_running_requests(mut self) -> Self {
120        self.request_timeout = LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING;
121        self
122    }
123
124    /// Set the worker restart policy.
125    ///
126    /// Policy checks run before requests enter the child. A policy restart is a
127    /// process restart; it is the only supported reset for Lean process-global
128    /// runtime and import state.
129    #[must_use]
130    pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
131        self.restart_policy = policy;
132        self
133    }
134}
135
136/// Policy for cycling a worker child before the next request.
137///
138/// The policy resets retained Lean runtime memory only by restarting the
139/// process. It does not change `lean-rs-host`'s in-process memory model, and it
140/// does not imply that `SessionPool::drain()` can return process-global Lean
141/// memory to the OS.
142#[derive(Clone, Debug, Default, Eq, PartialEq)]
143pub struct LeanWorkerRestartPolicy {
144    max_requests: Option<u64>,
145    max_imports: Option<u64>,
146    max_rss_kib: Option<u64>,
147    idle_restart_after: Option<Duration>,
148}
149
150impl LeanWorkerRestartPolicy {
151    /// Disable automatic policy restarts.
152    #[must_use]
153    pub fn disabled() -> Self {
154        Self::default()
155    }
156
157    /// Restart before a request when this many requests have entered the child.
158    #[must_use]
159    pub fn max_requests(mut self, limit: u64) -> Self {
160        self.max_requests = Some(limit.max(1));
161        self
162    }
163
164    /// Restart before an import-like request when this many imports have run.
165    #[must_use]
166    pub fn max_imports(mut self, limit: u64) -> Self {
167        self.max_imports = Some(limit.max(1));
168        self
169    }
170
171    /// Restart before a request when measured child RSS is at least this many KiB.
172    ///
173    /// RSS measurement is best effort. It is implemented for the current
174    /// supported Unix development targets; unsupported platforms skip the
175    /// check and increment `LeanWorkerStats::rss_samples_unavailable`.
176    #[must_use]
177    pub fn max_rss_kib(mut self, limit: u64) -> Self {
178        self.max_rss_kib = Some(limit.max(1));
179        self
180    }
181
182    /// Restart before a request when the worker has been idle for this long.
183    #[must_use]
184    pub fn idle_restart_after(mut self, duration: Duration) -> Self {
185        self.idle_restart_after = Some(duration);
186        self
187    }
188}
189
190/// Reason recorded for the latest worker cycle.
191#[derive(Clone, Debug, Eq, PartialEq)]
192pub enum LeanWorkerRestartReason {
193    /// The caller explicitly requested a process cycle.
194    Explicit,
195    /// Request count reached the configured limit before the next request.
196    MaxRequests { limit: u64 },
197    /// Import-like request count reached the configured limit before the next import.
198    MaxImports { limit: u64 },
199    /// Child resident set size reached the configured limit.
200    RssCeiling { current_kib: u64, limit_kib: u64 },
201    /// Worker was idle at least as long as the configured limit.
202    Idle { idle_for: Duration, limit: Duration },
203    /// Parent-side cancellation replaced the child during an in-flight request.
204    Cancelled { operation: &'static str },
205    /// Parent-side request timeout replaced the child during an in-flight request.
206    RequestTimeout {
207        operation: &'static str,
208        duration: Duration,
209    },
210}
211
212/// Snapshot of worker lifecycle counters.
213#[derive(Clone, Debug, Default, Eq, PartialEq)]
214pub struct LeanWorkerStats {
215    /// Requests that entered a worker child.
216    pub requests: u64,
217    /// Import-like requests that entered a worker child.
218    pub imports: u64,
219    /// Child exits observed by the supervisor, including policy cycles.
220    pub exits: u64,
221    /// Policy or explicit restarts performed by the supervisor.
222    pub restarts: u64,
223    /// Explicit process cycles.
224    pub explicit_cycles: u64,
225    /// Restarts caused by `LeanWorkerRestartPolicy::max_requests`.
226    pub max_request_restarts: u64,
227    /// Restarts caused by `LeanWorkerRestartPolicy::max_imports`.
228    pub max_import_restarts: u64,
229    /// Restarts caused by `LeanWorkerRestartPolicy::max_rss_kib`.
230    pub rss_restarts: u64,
231    /// Restarts caused by `LeanWorkerRestartPolicy::idle_restart_after`.
232    pub idle_restarts: u64,
233    /// Restarts caused by parent-side cancellation of an in-flight request.
234    pub cancelled_restarts: u64,
235    /// Restarts caused by parent-side request timeouts.
236    pub timeout_restarts: u64,
237    /// RSS checks skipped because the platform did not provide a usable sample.
238    pub rss_samples_unavailable: u64,
239    /// Last measured child RSS in KiB, when a policy check could sample it.
240    pub last_rss_kib: Option<u64>,
241    /// Most recent restart reason, if any.
242    pub last_restart_reason: Option<LeanWorkerRestartReason>,
243    /// Streaming requests that entered a worker child.
244    pub stream_requests: u64,
245    /// Streaming requests that reached terminal success.
246    pub stream_successes: u64,
247    /// Streaming requests that failed after entering the child.
248    pub stream_failures: u64,
249    /// Data rows delivered to parent-side sinks.
250    pub data_rows_delivered: u64,
251    /// Raw row payload bytes delivered to parent-side sinks.
252    pub data_row_payload_bytes: u64,
253    /// Total elapsed time spent in streaming requests.
254    pub stream_elapsed: Duration,
255    /// Times the bounded worker-event reader had to wait for the parent to drain events.
256    pub backpressure_waits: u64,
257    /// Streaming requests that failed after bounded-buffer backpressure was observed.
258    pub backpressure_failures: u64,
259}
260
261impl LeanWorkerStats {
262    fn record_restart(&mut self, reason: LeanWorkerRestartReason) {
263        self.restarts = self.restarts.saturating_add(1);
264        match &reason {
265            LeanWorkerRestartReason::Explicit => {
266                self.explicit_cycles = self.explicit_cycles.saturating_add(1);
267            }
268            LeanWorkerRestartReason::MaxRequests { .. } => {
269                self.max_request_restarts = self.max_request_restarts.saturating_add(1);
270            }
271            LeanWorkerRestartReason::MaxImports { .. } => {
272                self.max_import_restarts = self.max_import_restarts.saturating_add(1);
273            }
274            LeanWorkerRestartReason::RssCeiling { .. } => {
275                self.rss_restarts = self.rss_restarts.saturating_add(1);
276            }
277            LeanWorkerRestartReason::Idle { .. } => {
278                self.idle_restarts = self.idle_restarts.saturating_add(1);
279            }
280            LeanWorkerRestartReason::Cancelled { .. } => {
281                self.cancelled_restarts = self.cancelled_restarts.saturating_add(1);
282            }
283            LeanWorkerRestartReason::RequestTimeout { .. } => {
284                self.timeout_restarts = self.timeout_restarts.saturating_add(1);
285            }
286        }
287        self.last_restart_reason = Some(reason);
288    }
289}
290
291/// Public lifecycle state for a worker child.
292#[derive(Clone, Debug, Eq, PartialEq)]
293pub enum LeanWorkerStatus {
294    /// The worker process is still running.
295    Running,
296    /// The worker process has exited.
297    Exited(LeanWorkerExit),
298}
299
300/// Rendered child-process exit information.
301#[derive(Clone, Debug, Eq, PartialEq)]
302pub struct LeanWorkerExit {
303    /// Whether the child process exited successfully.
304    pub success: bool,
305    /// The platform exit code when one is available.
306    pub code: Option<i32>,
307    /// The platform-rendered process status.
308    pub status: String,
309    /// Captured child diagnostics, if available.
310    pub diagnostics: String,
311}
312
313impl LeanWorkerExit {
314    fn from_status(status: ExitStatus, diagnostics: String) -> Self {
315        Self {
316            success: status.success(),
317            code: status.code(),
318            status: status.to_string(),
319            diagnostics,
320        }
321    }
322}
323
324/// Errors reported by the worker supervisor.
325#[derive(Debug)]
326pub enum LeanWorkerError {
327    /// The worker child could not be spawned.
328    Spawn {
329        executable: PathBuf,
330        source: std::io::Error,
331    },
332    /// The default worker child executable could not be resolved.
333    WorkerChildUnresolved {
334        /// Candidate paths checked by the default resolver.
335        tried: Vec<PathBuf>,
336    },
337    /// The resolved worker child is missing or is not executable.
338    WorkerChildNotExecutable { path: PathBuf, reason: String },
339    /// Worker bootstrap preflight failed before a real command ran.
340    Bootstrap {
341        code: LeanWorkerBootstrapDiagnosticCode,
342        message: String,
343    },
344    /// The capability Lake target could not be built.
345    CapabilityBuild {
346        /// Typed Lake/toolchain diagnostic from `lean-toolchain`.
347        diagnostic: lean_toolchain::LinkDiagnostics,
348    },
349    /// The child process could not be prepared after spawning.
350    Setup { message: String },
351    /// The child did not complete the startup handshake.
352    Handshake { message: String },
353    /// The worker protocol failed after the handshake.
354    Protocol { message: String },
355    /// The child returned a typed worker error.
356    Worker { code: String, message: String },
357    /// The child exited while a request was in flight.
358    ChildExited { exit: LeanWorkerExit },
359    /// The child exited fatally while a request was in flight.
360    ChildPanicOrAbort { exit: LeanWorkerExit },
361    /// A worker operation timed out.
362    Timeout {
363        operation: &'static str,
364        duration: Duration,
365    },
366    /// A parent-side cancellation token was observed.
367    Cancelled { operation: &'static str },
368    /// A parent-side progress sink panicked while handling a worker event.
369    ProgressPanic { message: String },
370    /// A parent-side data sink panicked while handling a worker row.
371    DataSinkPanic { message: String },
372    /// A parent-side diagnostic sink panicked while handling a worker diagnostic.
373    DiagnosticSinkPanic { message: String },
374    /// A streaming export returned a nonzero downstream status byte.
375    StreamExportFailed { status: u8 },
376    /// The in-child string callback helper returned a callback failure status.
377    StreamCallbackFailed { status: u8, description: String },
378    /// A streaming callback emitted a malformed row envelope.
379    StreamRowMalformed { message: String },
380    /// A capability metadata export returned malformed JSON.
381    CapabilityMetadataMalformed { message: String },
382    /// Capability metadata did not match the caller's requested expectation.
383    CapabilityMetadataMismatch {
384        export: String,
385        expected: Box<LeanWorkerCapabilityMetadata>,
386        actual: Box<LeanWorkerCapabilityMetadata>,
387    },
388    /// A capability doctor export returned malformed JSON.
389    CapabilityDoctorMalformed { message: String },
390    /// A typed command request could not be serialized as JSON.
391    TypedCommandRequestEncode { export: String, message: String },
392    /// A typed non-streaming command response could not be decoded.
393    TypedCommandResponseDecode { export: String, message: String },
394    /// A typed streaming command row payload could not be decoded.
395    TypedCommandRowDecode {
396        export: String,
397        stream: String,
398        sequence: u64,
399        message: String,
400    },
401    /// A typed streaming command terminal summary could not be decoded.
402    TypedCommandSummaryDecode { export: String, message: String },
403    /// A pool session lease was invalidated by a worker lifecycle transition.
404    LeaseInvalidated { reason: String },
405    /// A local worker pool cannot admit another distinct session key.
406    WorkerPoolExhausted { max_workers: usize },
407    /// A local worker pool cannot admit work without exceeding its RSS budget.
408    WorkerPoolMemoryBudgetExceeded { current_kib: u64, limit_kib: u64 },
409    /// Waiting for local worker-pool admission exceeded the configured limit.
410    WorkerPoolQueueTimeout { waited: Duration },
411    /// The public supervisor does not support the requested operation.
412    UnsupportedRequest { operation: &'static str },
413    /// Waiting for a child process failed.
414    Wait { source: std::io::Error },
415}
416
417impl fmt::Display for LeanWorkerError {
418    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419        match self {
420            Self::Spawn { executable, source } => {
421                write!(f, "failed to spawn worker {}: {source}", executable.display())
422            }
423            Self::WorkerChildUnresolved { tried } => {
424                let tried = tried
425                    .iter()
426                    .map(|path| path.display().to_string())
427                    .collect::<Vec<_>>()
428                    .join(", ");
429                write!(
430                    f,
431                    "could not resolve lean-rs-worker-child; set LEAN_RS_WORKER_CHILD or place it beside the current executable (tried: {tried})"
432                )
433            }
434            Self::WorkerChildNotExecutable { path, reason } => {
435                write!(f, "worker child '{}' is not executable: {reason}", path.display())
436            }
437            Self::Bootstrap { code, message } => {
438                write!(f, "worker bootstrap check {code} failed: {message}")
439            }
440            Self::CapabilityBuild { diagnostic } => {
441                write!(f, "worker capability Lake target build failed: {diagnostic}")
442            }
443            Self::Setup { message } => write!(f, "worker child setup failed: {message}"),
444            Self::Handshake { message } => write!(f, "worker handshake failed: {message}"),
445            Self::Protocol { message } => write!(f, "worker protocol failed: {message}"),
446            Self::Worker { code, message } => write!(f, "worker returned {code}: {message}"),
447            Self::ChildExited { exit } => write!(f, "worker exited with {}", exit.status),
448            Self::ChildPanicOrAbort { exit } => {
449                write!(f, "worker exited fatally with {}", exit.status)
450            }
451            Self::Timeout { operation, duration } => {
452                write!(f, "worker operation {operation} timed out after {duration:?}")
453            }
454            Self::Cancelled { operation } => write!(f, "worker operation {operation} was cancelled"),
455            Self::ProgressPanic { message } => write!(f, "worker progress sink panicked: {message}"),
456            Self::DataSinkPanic { message } => write!(f, "worker data sink panicked: {message}"),
457            Self::DiagnosticSinkPanic { message } => {
458                write!(f, "worker diagnostic sink panicked: {message}")
459            }
460            Self::StreamExportFailed { status } => write!(f, "streaming export returned status {status}"),
461            Self::StreamCallbackFailed { status, description } => {
462                write!(f, "streaming callback failed with status {status}: {description}")
463            }
464            Self::StreamRowMalformed { message } => write!(f, "streaming export emitted malformed row: {message}"),
465            Self::CapabilityMetadataMalformed { message } => {
466                write!(f, "capability metadata export returned malformed JSON: {message}")
467            }
468            Self::CapabilityMetadataMismatch { export, .. } => {
469                write!(f, "capability metadata from {export} did not match expectation")
470            }
471            Self::CapabilityDoctorMalformed { message } => {
472                write!(f, "capability doctor export returned malformed JSON: {message}")
473            }
474            Self::TypedCommandRequestEncode { export, message } => {
475                write!(f, "typed worker command {export} request JSON encode failed: {message}")
476            }
477            Self::TypedCommandResponseDecode { export, message } => {
478                write!(
479                    f,
480                    "typed worker command {export} response JSON decode failed: {message}"
481                )
482            }
483            Self::TypedCommandRowDecode {
484                export,
485                stream,
486                sequence,
487                message,
488            } => {
489                write!(
490                    f,
491                    "typed worker command {export} row decode failed at stream {stream} sequence {sequence}: {message}"
492                )
493            }
494            Self::TypedCommandSummaryDecode { export, message } => {
495                write!(
496                    f,
497                    "typed worker command {export} terminal summary decode failed: {message}"
498                )
499            }
500            Self::LeaseInvalidated { reason } => write!(f, "worker pool lease was invalidated: {reason}"),
501            Self::WorkerPoolExhausted { max_workers } => {
502                write!(
503                    f,
504                    "worker pool cannot admit another session key; max_workers={max_workers}"
505                )
506            }
507            Self::WorkerPoolMemoryBudgetExceeded { current_kib, limit_kib } => {
508                write!(
509                    f,
510                    "worker pool cannot admit work within RSS budget; current_kib={current_kib} limit_kib={limit_kib}"
511                )
512            }
513            Self::WorkerPoolQueueTimeout { waited } => {
514                write!(f, "worker pool admission timed out after {waited:?}")
515            }
516            Self::UnsupportedRequest { operation } => {
517                write!(f, "worker operation {operation} is not supported")
518            }
519            Self::Wait { source } => write!(f, "failed to wait for worker child: {source}"),
520        }
521    }
522}
523
524impl std::error::Error for LeanWorkerError {
525    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
526        match self {
527            Self::Spawn { source, .. } | Self::Wait { source } => Some(source),
528            Self::CapabilityBuild { diagnostic } => Some(diagnostic),
529            Self::WorkerChildUnresolved { .. } | Self::WorkerChildNotExecutable { .. } | Self::Bootstrap { .. } => None,
530            Self::Setup { .. }
531            | Self::Handshake { .. }
532            | Self::Protocol { .. }
533            | Self::Worker { .. }
534            | Self::ChildExited { .. }
535            | Self::ChildPanicOrAbort { .. }
536            | Self::Timeout { .. }
537            | Self::Cancelled { .. }
538            | Self::ProgressPanic { .. }
539            | Self::DataSinkPanic { .. }
540            | Self::DiagnosticSinkPanic { .. }
541            | Self::StreamExportFailed { .. }
542            | Self::StreamCallbackFailed { .. }
543            | Self::StreamRowMalformed { .. }
544            | Self::CapabilityMetadataMalformed { .. }
545            | Self::CapabilityMetadataMismatch { .. }
546            | Self::CapabilityDoctorMalformed { .. }
547            | Self::TypedCommandRequestEncode { .. }
548            | Self::TypedCommandResponseDecode { .. }
549            | Self::TypedCommandRowDecode { .. }
550            | Self::TypedCommandSummaryDecode { .. }
551            | Self::LeaseInvalidated { .. }
552            | Self::WorkerPoolExhausted { .. }
553            | Self::WorkerPoolMemoryBudgetExceeded { .. }
554            | Self::WorkerPoolQueueTimeout { .. }
555            | Self::UnsupportedRequest { .. } => None,
556        }
557    }
558}
559
560/// Supervisor for one `lean-rs-worker` child process.
561///
562/// Dropping a live supervisor attempts to terminate the child and then waits
563/// for it. Drop never panics; explicit `terminate` is preferred when callers
564/// need the exit status.
565#[derive(Debug)]
566pub struct LeanWorker {
567    config: LeanWorkerConfig,
568    child: Option<Child>,
569    stdin: Option<BufWriter<ChildStdin>>,
570    stdout: Option<BufReader<ChildStdout>>,
571    stderr: Option<ChildStderr>,
572    last_exit: Option<LeanWorkerExit>,
573    runtime_metadata: LeanWorkerRuntimeMetadata,
574    stats: LeanWorkerStats,
575    requests_since_restart: u64,
576    imports_since_restart: u64,
577    last_activity: Instant,
578}
579
580impl LeanWorker {
581    /// Spawn a worker child and wait for its protocol handshake.
582    ///
583    /// # Errors
584    ///
585    /// Returns `LeanWorkerError` if the child cannot be spawned, child setup
586    /// fails, the child exits before handshaking, or the startup timeout
587    /// expires.
588    pub fn spawn(config: &LeanWorkerConfig) -> Result<Self, LeanWorkerError> {
589        let mut command = Command::new(&config.executable);
590        command
591            .stdin(Stdio::piped())
592            .stdout(Stdio::piped())
593            .stderr(Stdio::piped())
594            .env("LEAN_ABORT_ON_PANIC", "1")
595            .env("LEAN_BACKTRACE", "0")
596            .env("RUST_BACKTRACE", "0");
597
598        if let Some(current_dir) = &config.current_dir {
599            command.current_dir(current_dir);
600        }
601        for (key, value) in &config.env {
602            command.env(key, value);
603        }
604
605        let mut child = command.spawn().map_err(|source| LeanWorkerError::Spawn {
606            executable: config.executable.clone(),
607            source,
608        })?;
609
610        let stdin = child
611            .stdin
612            .take()
613            .map(BufWriter::new)
614            .ok_or_else(|| LeanWorkerError::Setup {
615                message: "child stdin unavailable".to_owned(),
616            })?;
617        let stdout = child.stdout.take().ok_or_else(|| LeanWorkerError::Setup {
618            message: "child stdout unavailable".to_owned(),
619        })?;
620        let stderr = child.stderr.take();
621
622        let (sender, receiver) = mpsc::channel();
623        let _handshake_reader = thread::spawn(move || {
624            let mut stdout = BufReader::new(stdout);
625            let result = expect_handshake(&mut stdout);
626            drop(sender.send((stdout, result)));
627        });
628
629        let (stdout, runtime_metadata) = match receiver.recv_timeout(config.startup_timeout) {
630            Ok((stdout, Ok(metadata))) => (stdout, metadata),
631            Ok((_stdout, Err(err))) => {
632                let mut worker = Self {
633                    config: config.clone(),
634                    child: Some(child),
635                    stdin: Some(stdin),
636                    stdout: None,
637                    stderr,
638                    last_exit: None,
639                    runtime_metadata: LeanWorkerRuntimeMetadata {
640                        worker_version: String::new(),
641                        protocol_version: crate::protocol::PROTOCOL_VERSION,
642                        lean_version: None,
643                    },
644                    stats: LeanWorkerStats::default(),
645                    requests_since_restart: 0,
646                    imports_since_restart: 0,
647                    last_activity: Instant::now(),
648                };
649                let exit = worker.try_record_exit();
650                return Err(match exit {
651                    Some(exit) if !exit.success => LeanWorkerError::ChildPanicOrAbort { exit },
652                    Some(exit) => LeanWorkerError::ChildExited { exit },
653                    None => err,
654                });
655            }
656            Err(mpsc::RecvTimeoutError::Timeout) => {
657                drop(child.kill());
658                let _exit = wait_with_stderr(&mut child, stderr)?;
659                return Err(LeanWorkerError::Timeout {
660                    operation: "startup",
661                    duration: config.startup_timeout,
662                });
663            }
664            Err(mpsc::RecvTimeoutError::Disconnected) => {
665                return Err(LeanWorkerError::Handshake {
666                    message: "handshake reader exited without a result".to_owned(),
667                });
668            }
669        };
670
671        Ok(Self {
672            config: config.clone(),
673            child: Some(child),
674            stdin: Some(stdin),
675            stdout: Some(stdout),
676            stderr,
677            last_exit: None,
678            runtime_metadata,
679            stats: LeanWorkerStats::default(),
680            requests_since_restart: 0,
681            imports_since_restart: 0,
682            last_activity: Instant::now(),
683        })
684    }
685
686    /// Check whether the worker responds to requests.
687    ///
688    /// # Errors
689    ///
690    /// Returns `LeanWorkerError` if the worker is dead, the protocol fails, or
691    /// the child returns a typed worker error.
692    pub fn health(&mut self) -> Result<(), LeanWorkerError> {
693        self.prepare_request(false)?;
694        self.send_request(Request::Health)?;
695        self.record_request(false);
696        match self.read_response("health")? {
697            Response::HealthOk => Ok(()),
698            other @ (Response::CapabilityLoaded
699            | Response::U64 { .. }
700            | Response::HostSessionOpened
701            | Response::Elaboration { .. }
702            | Response::KernelCheck { .. }
703            | Response::Strings { .. }
704            | Response::StreamComplete { .. }
705            | Response::StreamExportFailed { .. }
706            | Response::StreamCallbackFailed { .. }
707            | Response::StreamRowMalformed { .. }
708            | Response::CapabilityMetadata { .. }
709            | Response::CapabilityDoctor { .. }
710            | Response::CapabilityMetadataMalformed { .. }
711            | Response::CapabilityDoctorMalformed { .. }
712            | Response::JsonCommand { .. }
713            | Response::MetaExpr { .. }
714            | Response::MetaBool { .. }
715            | Response::Declaration { .. }
716            | Response::DeclarationBulk { .. }
717            | Response::ProcessFile { .. }
718            | Response::ProcessModule { .. }
719            | Response::RowsComplete { .. }
720            | Response::Terminating
721            | Response::Error { .. }) => Err(unexpected_response("health", &other)),
722        }
723    }
724
725    /// Load the in-tree fixture capability in the worker child.
726    ///
727    /// This is a fixture-only entry point used to exercise the supervisor path
728    /// in tests. The supported public path is `open_session`, which returns the
729    /// host-session adapter instead of expanding this fixture surface.
730    ///
731    /// # Errors
732    ///
733    /// Returns `LeanWorkerError` if the worker is dead, fixture loading fails,
734    /// or protocol communication fails.
735    pub fn load_fixture_capability(&mut self, fixture_root: impl AsRef<Path>) -> Result<(), LeanWorkerError> {
736        self.prepare_request(true)?;
737        self.send_request(Request::LoadFixtureCapability {
738            fixture_root: path_string(fixture_root.as_ref()),
739        })?;
740        self.record_request(true);
741        match self.read_response("load_fixture_capability")? {
742            Response::CapabilityLoaded => Ok(()),
743            other @ (Response::HealthOk
744            | Response::U64 { .. }
745            | Response::HostSessionOpened
746            | Response::Elaboration { .. }
747            | Response::KernelCheck { .. }
748            | Response::Strings { .. }
749            | Response::StreamComplete { .. }
750            | Response::StreamExportFailed { .. }
751            | Response::StreamCallbackFailed { .. }
752            | Response::StreamRowMalformed { .. }
753            | Response::CapabilityMetadata { .. }
754            | Response::CapabilityDoctor { .. }
755            | Response::CapabilityMetadataMalformed { .. }
756            | Response::CapabilityDoctorMalformed { .. }
757            | Response::JsonCommand { .. }
758            | Response::MetaExpr { .. }
759            | Response::MetaBool { .. }
760            | Response::Declaration { .. }
761            | Response::DeclarationBulk { .. }
762            | Response::ProcessFile { .. }
763            | Response::ProcessModule { .. }
764            | Response::RowsComplete { .. }
765            | Response::Terminating
766            | Response::Error { .. }) => Err(unexpected_response("load_fixture_capability", &other)),
767        }
768    }
769
770    /// Call the prompt fixture multiplication export in the worker child.
771    ///
772    /// # Errors
773    ///
774    /// Returns `LeanWorkerError` if the worker is dead, the export fails, or
775    /// protocol communication fails.
776    pub fn call_fixture_mul(
777        &mut self,
778        fixture_root: impl AsRef<Path>,
779        lhs: u64,
780        rhs: u64,
781    ) -> Result<u64, LeanWorkerError> {
782        self.prepare_request(true)?;
783        self.send_request(Request::CallFixtureMul {
784            fixture_root: path_string(fixture_root.as_ref()),
785            lhs,
786            rhs,
787        })?;
788        self.record_request(true);
789        match self.read_response("call_fixture_mul")? {
790            Response::U64 { value } => Ok(value),
791            other @ (Response::HealthOk
792            | Response::CapabilityLoaded
793            | Response::HostSessionOpened
794            | Response::Elaboration { .. }
795            | Response::KernelCheck { .. }
796            | Response::Strings { .. }
797            | Response::StreamComplete { .. }
798            | Response::StreamExportFailed { .. }
799            | Response::StreamCallbackFailed { .. }
800            | Response::StreamRowMalformed { .. }
801            | Response::CapabilityMetadata { .. }
802            | Response::CapabilityDoctor { .. }
803            | Response::CapabilityMetadataMalformed { .. }
804            | Response::CapabilityDoctorMalformed { .. }
805            | Response::JsonCommand { .. }
806            | Response::MetaExpr { .. }
807            | Response::MetaBool { .. }
808            | Response::Declaration { .. }
809            | Response::DeclarationBulk { .. }
810            | Response::ProcessFile { .. }
811            | Response::ProcessModule { .. }
812            | Response::RowsComplete { .. }
813            | Response::Terminating
814            | Response::Error { .. }) => Err(unexpected_response("call_fixture_mul", &other)),
815        }
816    }
817
818    /// Return the current worker lifecycle status.
819    ///
820    /// # Errors
821    ///
822    /// Returns `LeanWorkerError` if checking the process status fails.
823    pub fn status(&mut self) -> Result<LeanWorkerStatus, LeanWorkerError> {
824        if let Some(exit) = &self.last_exit {
825            return Ok(LeanWorkerStatus::Exited(exit.clone()));
826        }
827        let Some(child) = self.child.as_mut() else {
828            return Ok(LeanWorkerStatus::Exited(LeanWorkerExit {
829                success: false,
830                code: None,
831                status: "worker is not running".to_owned(),
832                diagnostics: String::new(),
833            }));
834        };
835        match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
836            Some(status) => {
837                let diagnostics = self.read_stderr();
838                let exit = LeanWorkerExit::from_status(status, diagnostics);
839                self.last_exit = Some(exit.clone());
840                self.child = None;
841                self.stdin = None;
842                self.stdout = None;
843                self.stats.exits = self.stats.exits.saturating_add(1);
844                Ok(LeanWorkerStatus::Exited(exit))
845            }
846            None => Ok(LeanWorkerStatus::Running),
847        }
848    }
849
850    /// Return lifecycle counters for this supervisor.
851    #[must_use]
852    pub fn stats(&self) -> LeanWorkerStats {
853        self.stats.clone()
854    }
855
856    /// Return protocol/runtime facts reported by the worker child.
857    #[must_use]
858    pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
859        self.runtime_metadata.clone()
860    }
861
862    /// Measure the current child RSS in KiB when supported by the platform.
863    ///
864    /// This is an observability hook for restart policy and memory-cycling
865    /// workloads. A `None` result means the platform did not provide a usable
866    /// sample; it is not a worker failure.
867    pub fn rss_kib(&mut self) -> Option<u64> {
868        match self.child_rss_kib() {
869            Some(value) => {
870                self.stats.last_rss_kib = Some(value);
871                Some(value)
872            }
873            None => {
874                self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
875                None
876            }
877        }
878    }
879
880    /// Return the timeout used for subsequent worker requests.
881    #[must_use]
882    pub fn request_timeout(&self) -> Duration {
883        self.config.request_timeout
884    }
885
886    /// Change the timeout for subsequent worker requests.
887    ///
888    /// This changes supervisor policy only. The supervisor still owns the
889    /// deadline, child kill, replacement, and restart accounting.
890    pub fn set_request_timeout(&mut self, timeout: Duration) {
891        self.config.request_timeout = timeout;
892    }
893
894    /// Explicitly cycle the worker process.
895    ///
896    /// This is the manual memory-reset operation. It terminates the current
897    /// child, starts a replacement with the original configuration, and records
898    /// `LeanWorkerRestartReason::Explicit`.
899    ///
900    /// # Errors
901    ///
902    /// Returns `LeanWorkerError` if the existing child cannot be waited on or
903    /// the replacement child cannot be spawned and handshaken.
904    pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
905        self.restart_with_reason(LeanWorkerRestartReason::Explicit)
906    }
907
908    pub(crate) fn cycle_with_restart_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
909        self.restart_with_reason(reason)
910    }
911
912    /// Restart this worker using its original configuration.
913    ///
914    /// This is an explicit lifecycle operation. Prompt 58 adds policy-driven
915    /// restarts for memory cycling; this method only gives callers a direct
916    /// reset point.
917    ///
918    /// # Errors
919    ///
920    /// Returns `LeanWorkerError` if the existing child cannot be waited on or
921    /// the replacement child cannot be spawned and handshaken.
922    pub fn restart(&mut self) -> Result<(), LeanWorkerError> {
923        self.cycle()
924    }
925
926    #[doc(hidden)]
927    /// Kill the child process for supervisor tests.
928    ///
929    /// # Errors
930    ///
931    /// Returns `LeanWorkerError` if the worker is already dead or the OS kill
932    /// request fails.
933    pub fn __kill_for_test(&mut self) -> Result<(), LeanWorkerError> {
934        let Some(child) = self.child.as_mut() else {
935            return Err(self.dead_error());
936        };
937        child.kill().map_err(|source| LeanWorkerError::Wait { source })?;
938        Ok(())
939    }
940
941    /// Ask the child to terminate cleanly and wait for it.
942    ///
943    /// # Errors
944    ///
945    /// Returns `LeanWorkerError` if the worker is already dead, the protocol
946    /// fails, or waiting for the child process fails.
947    pub fn terminate(mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
948        self.send_request(Request::Terminate)?;
949        match self.read_response("terminate")? {
950            Response::Terminating => self.wait_for_exit(),
951            other @ (Response::HealthOk
952            | Response::CapabilityLoaded
953            | Response::U64 { .. }
954            | Response::HostSessionOpened
955            | Response::Elaboration { .. }
956            | Response::KernelCheck { .. }
957            | Response::Strings { .. }
958            | Response::StreamComplete { .. }
959            | Response::StreamExportFailed { .. }
960            | Response::StreamCallbackFailed { .. }
961            | Response::StreamRowMalformed { .. }
962            | Response::CapabilityMetadata { .. }
963            | Response::CapabilityDoctor { .. }
964            | Response::CapabilityMetadataMalformed { .. }
965            | Response::CapabilityDoctorMalformed { .. }
966            | Response::JsonCommand { .. }
967            | Response::MetaExpr { .. }
968            | Response::MetaBool { .. }
969            | Response::Declaration { .. }
970            | Response::DeclarationBulk { .. }
971            | Response::ProcessFile { .. }
972            | Response::ProcessModule { .. }
973            | Response::RowsComplete { .. }
974            | Response::Error { .. }) => Err(unexpected_response("terminate", &other)),
975        }
976    }
977
978    #[doc(hidden)]
979    /// Trigger the prompt fixture panic path.
980    ///
981    /// # Errors
982    ///
983    /// Returns `LeanWorkerError` if the worker does not exit fatally or if the
984    /// protocol fails before the panic path runs.
985    pub fn __trigger_lean_panic_fixture(
986        mut self,
987        fixture_root: impl AsRef<Path>,
988    ) -> Result<LeanWorkerExit, LeanWorkerError> {
989        self.prepare_request(true)?;
990        self.send_request(Request::TriggerLeanPanic {
991            fixture_root: path_string(fixture_root.as_ref()),
992        })?;
993        self.record_request(true);
994        match self.read_response("trigger_lean_panic") {
995            Ok(response) => Err(unexpected_response("trigger_lean_panic", &response)),
996            Err(LeanWorkerError::ChildPanicOrAbort { exit }) => Ok(exit),
997            Err(err) => Err(err),
998        }
999    }
1000
1001    #[doc(hidden)]
1002    /// Emit synthetic worker data rows through the private protocol for row sink tests.
1003    ///
1004    /// # Errors
1005    ///
1006    /// Returns `LeanWorkerError` if the worker is dead, the sink panics,
1007    /// cancellation is observed, or protocol communication fails.
1008    pub fn __emit_test_rows(
1009        &mut self,
1010        streams: Vec<String>,
1011        cancellation: Option<&LeanWorkerCancellationToken>,
1012        data: Option<&dyn LeanWorkerDataSink>,
1013    ) -> Result<u64, LeanWorkerError> {
1014        const OPERATION: &str = "emit_test_rows";
1015        check_cancelled(OPERATION, cancellation)?;
1016        self.prepare_request(false)?;
1017        self.send_request(Request::EmitTestRows { streams })?;
1018        self.record_request(false);
1019        match self.read_response_with_events(
1020            OPERATION,
1021            None,
1022            cancellation,
1023            data.map(LeanWorkerDataSinkTarget::Value),
1024            None,
1025        )? {
1026            Response::RowsComplete { count } => Ok(count),
1027            other @ (Response::HealthOk
1028            | Response::CapabilityLoaded
1029            | Response::U64 { .. }
1030            | Response::HostSessionOpened
1031            | Response::Elaboration { .. }
1032            | Response::KernelCheck { .. }
1033            | Response::Strings { .. }
1034            | Response::StreamComplete { .. }
1035            | Response::StreamExportFailed { .. }
1036            | Response::StreamCallbackFailed { .. }
1037            | Response::StreamRowMalformed { .. }
1038            | Response::CapabilityMetadata { .. }
1039            | Response::CapabilityDoctor { .. }
1040            | Response::CapabilityMetadataMalformed { .. }
1041            | Response::CapabilityDoctorMalformed { .. }
1042            | Response::JsonCommand { .. }
1043            | Response::MetaExpr { .. }
1044            | Response::MetaBool { .. }
1045            | Response::Declaration { .. }
1046            | Response::DeclarationBulk { .. }
1047            | Response::ProcessFile { .. }
1048            | Response::ProcessModule { .. }
1049            | Response::Terminating
1050            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1051        }
1052    }
1053
1054    pub(crate) fn open_worker_session(
1055        &mut self,
1056        config: &LeanWorkerSessionConfig,
1057        cancellation: Option<&LeanWorkerCancellationToken>,
1058        progress: Option<&dyn LeanWorkerProgressSink>,
1059    ) -> Result<(), LeanWorkerError> {
1060        const OPERATION: &str = "open_worker_session";
1061        check_cancelled(OPERATION, cancellation)?;
1062        self.prepare_request(true)?;
1063        self.send_request(Request::OpenHostSession {
1064            project_root: config.project_root_string(),
1065            package: config.package().to_owned(),
1066            lib_name: config.lib_name().to_owned(),
1067            imports: config.imports().to_vec(),
1068        })?;
1069        self.record_request(true);
1070        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1071            Response::HostSessionOpened => Ok(()),
1072            other @ (Response::HealthOk
1073            | Response::CapabilityLoaded
1074            | Response::U64 { .. }
1075            | Response::Elaboration { .. }
1076            | Response::KernelCheck { .. }
1077            | Response::Strings { .. }
1078            | Response::StreamComplete { .. }
1079            | Response::StreamExportFailed { .. }
1080            | Response::StreamCallbackFailed { .. }
1081            | Response::StreamRowMalformed { .. }
1082            | Response::CapabilityMetadata { .. }
1083            | Response::CapabilityDoctor { .. }
1084            | Response::CapabilityMetadataMalformed { .. }
1085            | Response::CapabilityDoctorMalformed { .. }
1086            | Response::JsonCommand { .. }
1087            | Response::MetaExpr { .. }
1088            | Response::MetaBool { .. }
1089            | Response::Declaration { .. }
1090            | Response::DeclarationBulk { .. }
1091            | Response::ProcessFile { .. }
1092            | Response::ProcessModule { .. }
1093            | Response::RowsComplete { .. }
1094            | Response::Terminating
1095            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1096        }
1097    }
1098
1099    #[expect(
1100        clippy::wildcard_enum_match_arm,
1101        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1102    )]
1103    pub(crate) fn worker_elaborate(
1104        &mut self,
1105        source: &str,
1106        options: &LeanWorkerElabOptions,
1107        cancellation: Option<&LeanWorkerCancellationToken>,
1108        progress: Option<&dyn LeanWorkerProgressSink>,
1109    ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
1110        self.round_trip(
1111            "worker_elaborate",
1112            Request::Elaborate {
1113                source: source.to_owned(),
1114                options: options.clone(),
1115            },
1116            false,
1117            cancellation,
1118            progress,
1119            |response, operation| match response {
1120                Response::Elaboration { outcome } => Ok(outcome),
1121                other => Err(unexpected_response(operation, &other)),
1122            },
1123        )
1124    }
1125
1126    #[expect(
1127        clippy::wildcard_enum_match_arm,
1128        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1129    )]
1130    pub(crate) fn worker_kernel_check(
1131        &mut self,
1132        source: &str,
1133        options: &LeanWorkerElabOptions,
1134        cancellation: Option<&LeanWorkerCancellationToken>,
1135        progress: Option<&dyn LeanWorkerProgressSink>,
1136    ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
1137        self.round_trip(
1138            "worker_kernel_check",
1139            Request::KernelCheck {
1140                source: source.to_owned(),
1141                options: options.clone(),
1142                progress: progress.is_some(),
1143            },
1144            false,
1145            cancellation,
1146            progress,
1147            |response, operation| match response {
1148                Response::KernelCheck { outcome } => Ok(outcome),
1149                other => Err(unexpected_response(operation, &other)),
1150            },
1151        )
1152    }
1153
1154    #[expect(
1155        clippy::wildcard_enum_match_arm,
1156        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1157    )]
1158    pub(crate) fn worker_declaration_kinds(
1159        &mut self,
1160        names: &[&str],
1161        cancellation: Option<&LeanWorkerCancellationToken>,
1162        progress: Option<&dyn LeanWorkerProgressSink>,
1163    ) -> Result<Vec<String>, LeanWorkerError> {
1164        self.round_trip(
1165            "worker_declaration_kinds",
1166            Request::DeclarationKinds {
1167                names: names.iter().map(|name| (*name).to_owned()).collect(),
1168                progress: progress.is_some(),
1169            },
1170            false,
1171            cancellation,
1172            progress,
1173            |response, operation| match response {
1174                Response::Strings { values } => Ok(values),
1175                other => Err(unexpected_response(operation, &other)),
1176            },
1177        )
1178    }
1179
1180    #[expect(
1181        clippy::wildcard_enum_match_arm,
1182        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1183    )]
1184    pub(crate) fn worker_declaration_names(
1185        &mut self,
1186        names: &[&str],
1187        cancellation: Option<&LeanWorkerCancellationToken>,
1188        progress: Option<&dyn LeanWorkerProgressSink>,
1189    ) -> Result<Vec<String>, LeanWorkerError> {
1190        self.round_trip(
1191            "worker_declaration_names",
1192            Request::DeclarationNames {
1193                names: names.iter().map(|name| (*name).to_owned()).collect(),
1194                progress: progress.is_some(),
1195            },
1196            false,
1197            cancellation,
1198            progress,
1199            |response, operation| match response {
1200                Response::Strings { values } => Ok(values),
1201                other => Err(unexpected_response(operation, &other)),
1202            },
1203        )
1204    }
1205
1206    #[expect(
1207        clippy::wildcard_enum_match_arm,
1208        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1209    )]
1210    pub(crate) fn worker_infer_type(
1211        &mut self,
1212        source: &str,
1213        options: &LeanWorkerElabOptions,
1214        cancellation: Option<&LeanWorkerCancellationToken>,
1215        progress: Option<&dyn LeanWorkerProgressSink>,
1216    ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
1217        self.round_trip(
1218            "worker_infer_type",
1219            Request::InferType {
1220                source: source.to_owned(),
1221                options: options.clone(),
1222            },
1223            false,
1224            cancellation,
1225            progress,
1226            |response, operation| match response {
1227                Response::MetaExpr { result } => Ok(result),
1228                other => Err(unexpected_response(operation, &other)),
1229            },
1230        )
1231    }
1232
1233    #[expect(
1234        clippy::wildcard_enum_match_arm,
1235        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1236    )]
1237    pub(crate) fn worker_whnf(
1238        &mut self,
1239        source: &str,
1240        options: &LeanWorkerElabOptions,
1241        cancellation: Option<&LeanWorkerCancellationToken>,
1242        progress: Option<&dyn LeanWorkerProgressSink>,
1243    ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
1244        self.round_trip(
1245            "worker_whnf",
1246            Request::Whnf {
1247                source: source.to_owned(),
1248                options: options.clone(),
1249            },
1250            false,
1251            cancellation,
1252            progress,
1253            |response, operation| match response {
1254                Response::MetaExpr { result } => Ok(result),
1255                other => Err(unexpected_response(operation, &other)),
1256            },
1257        )
1258    }
1259
1260    #[expect(
1261        clippy::wildcard_enum_match_arm,
1262        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1263    )]
1264    pub(crate) fn worker_is_def_eq(
1265        &mut self,
1266        lhs: &str,
1267        rhs: &str,
1268        transparency: LeanWorkerMetaTransparency,
1269        options: &LeanWorkerElabOptions,
1270        cancellation: Option<&LeanWorkerCancellationToken>,
1271        progress: Option<&dyn LeanWorkerProgressSink>,
1272    ) -> Result<LeanWorkerMetaResult<bool>, LeanWorkerError> {
1273        self.round_trip(
1274            "worker_is_def_eq",
1275            Request::IsDefEq {
1276                lhs: lhs.to_owned(),
1277                rhs: rhs.to_owned(),
1278                transparency,
1279                options: options.clone(),
1280            },
1281            false,
1282            cancellation,
1283            progress,
1284            |response, operation| match response {
1285                Response::MetaBool { result } => Ok(result),
1286                other => Err(unexpected_response(operation, &other)),
1287            },
1288        )
1289    }
1290
1291    #[expect(
1292        clippy::wildcard_enum_match_arm,
1293        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1294    )]
1295    pub(crate) fn worker_describe(
1296        &mut self,
1297        name: &str,
1298        cancellation: Option<&LeanWorkerCancellationToken>,
1299        progress: Option<&dyn LeanWorkerProgressSink>,
1300    ) -> Result<Option<LeanWorkerDeclarationRow>, LeanWorkerError> {
1301        self.round_trip(
1302            "worker_describe",
1303            Request::Describe { name: name.to_owned() },
1304            false,
1305            cancellation,
1306            progress,
1307            |response, operation| match response {
1308                Response::Declaration { row } => Ok(row),
1309                other => Err(unexpected_response(operation, &other)),
1310            },
1311        )
1312    }
1313
1314    #[allow(
1315        clippy::needless_pass_by_value,
1316        reason = "filter is cheap to clone, passed by value matches caller shape"
1317    )]
1318    pub(crate) fn worker_list_declarations_strings(
1319        &mut self,
1320        filter: LeanWorkerDeclarationFilter,
1321        cancellation: Option<&LeanWorkerCancellationToken>,
1322        progress: Option<&dyn LeanWorkerProgressSink>,
1323    ) -> Result<Vec<String>, LeanWorkerError> {
1324        const OPERATION: &str = "worker_list_declarations_strings";
1325        check_cancelled(OPERATION, cancellation)?;
1326        self.prepare_request(false)?;
1327        self.send_request(Request::ListDeclarationsStrings {
1328            filter,
1329            progress: progress.is_some(),
1330        })?;
1331        self.record_request(false);
1332        let collector = DeclarationNameCollector::default();
1333        let response = self.read_response_with_events(
1334            OPERATION,
1335            progress,
1336            cancellation,
1337            Some(LeanWorkerDataSinkTarget::Raw(&collector)),
1338            None,
1339        )?;
1340        if let Some(message) = collector.decode_error.lock().ok().and_then(|guard| guard.clone()) {
1341            return Err(LeanWorkerError::Protocol { message });
1342        }
1343        match response {
1344            Response::RowsComplete { count } => {
1345                let names = collector.into_inner();
1346                let observed = u64::try_from(names.len()).unwrap_or(u64::MAX);
1347                if observed != count {
1348                    return Err(LeanWorkerError::Protocol {
1349                        message: format!(
1350                            "worker_list_declarations_strings: parent collected {observed} rows but child reported {count}"
1351                        ),
1352                    });
1353                }
1354                Ok(names)
1355            }
1356            other @ (Response::HealthOk
1357            | Response::CapabilityLoaded
1358            | Response::U64 { .. }
1359            | Response::HostSessionOpened
1360            | Response::Elaboration { .. }
1361            | Response::KernelCheck { .. }
1362            | Response::Strings { .. }
1363            | Response::StreamComplete { .. }
1364            | Response::StreamExportFailed { .. }
1365            | Response::StreamCallbackFailed { .. }
1366            | Response::StreamRowMalformed { .. }
1367            | Response::CapabilityMetadata { .. }
1368            | Response::CapabilityDoctor { .. }
1369            | Response::CapabilityMetadataMalformed { .. }
1370            | Response::CapabilityDoctorMalformed { .. }
1371            | Response::JsonCommand { .. }
1372            | Response::MetaExpr { .. }
1373            | Response::MetaBool { .. }
1374            | Response::Declaration { .. }
1375            | Response::DeclarationBulk { .. }
1376            | Response::ProcessFile { .. }
1377            | Response::ProcessModule { .. }
1378            | Response::Terminating
1379            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1380        }
1381    }
1382
1383    #[expect(
1384        clippy::wildcard_enum_match_arm,
1385        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1386    )]
1387    pub(crate) fn worker_describe_bulk(
1388        &mut self,
1389        names: &[&str],
1390        cancellation: Option<&LeanWorkerCancellationToken>,
1391        progress: Option<&dyn LeanWorkerProgressSink>,
1392    ) -> Result<Vec<LeanWorkerDeclarationRow>, LeanWorkerError> {
1393        self.round_trip(
1394            "worker_describe_bulk",
1395            Request::DescribeBulk {
1396                names: names.iter().map(|name| (*name).to_owned()).collect(),
1397                progress: progress.is_some(),
1398            },
1399            false,
1400            cancellation,
1401            progress,
1402            |response, operation| match response {
1403                Response::DeclarationBulk { rows } => Ok(rows),
1404                other => Err(unexpected_response(operation, &other)),
1405            },
1406        )
1407    }
1408
1409    #[expect(
1410        clippy::wildcard_enum_match_arm,
1411        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1412    )]
1413    pub(crate) fn worker_process_file(
1414        &mut self,
1415        source: &str,
1416        options: &LeanWorkerElabOptions,
1417        cancellation: Option<&LeanWorkerCancellationToken>,
1418        progress: Option<&dyn LeanWorkerProgressSink>,
1419    ) -> Result<LeanWorkerProcessFileOutcome, LeanWorkerError> {
1420        self.round_trip(
1421            "worker_process_file",
1422            Request::ProcessFile {
1423                source: source.to_owned(),
1424                options: options.clone(),
1425            },
1426            false,
1427            cancellation,
1428            progress,
1429            |response, operation| match response {
1430                Response::ProcessFile { outcome } => Ok(outcome),
1431                other => Err(unexpected_response(operation, &other)),
1432            },
1433        )
1434    }
1435
1436    #[expect(
1437        clippy::wildcard_enum_match_arm,
1438        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1439    )]
1440    pub(crate) fn worker_process_module(
1441        &mut self,
1442        source: &str,
1443        options: &LeanWorkerElabOptions,
1444        cancellation: Option<&LeanWorkerCancellationToken>,
1445        progress: Option<&dyn LeanWorkerProgressSink>,
1446    ) -> Result<LeanWorkerProcessModuleOutcome, LeanWorkerError> {
1447        self.round_trip(
1448            "worker_process_module",
1449            Request::ProcessModule {
1450                source: source.to_owned(),
1451                options: options.clone(),
1452            },
1453            false,
1454            cancellation,
1455            progress,
1456            |response, operation| match response {
1457                Response::ProcessModule { outcome } => Ok(outcome),
1458                other => Err(unexpected_response(operation, &other)),
1459            },
1460        )
1461    }
1462
1463    pub(crate) fn worker_run_data_stream(
1464        &mut self,
1465        export: &str,
1466        request: &serde_json::Value,
1467        rows: &dyn LeanWorkerDataSink,
1468        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1469        cancellation: Option<&LeanWorkerCancellationToken>,
1470        progress: Option<&dyn LeanWorkerProgressSink>,
1471    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1472        self.worker_run_data_stream_with_sink(
1473            export,
1474            request,
1475            LeanWorkerDataSinkTarget::Value(rows),
1476            diagnostics,
1477            cancellation,
1478            progress,
1479        )
1480    }
1481
1482    pub(crate) fn worker_run_data_stream_raw(
1483        &mut self,
1484        export: &str,
1485        request: &serde_json::Value,
1486        rows: &dyn LeanWorkerRawDataSink,
1487        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1488        cancellation: Option<&LeanWorkerCancellationToken>,
1489        progress: Option<&dyn LeanWorkerProgressSink>,
1490    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1491        self.worker_run_data_stream_with_sink(
1492            export,
1493            request,
1494            LeanWorkerDataSinkTarget::Raw(rows),
1495            diagnostics,
1496            cancellation,
1497            progress,
1498        )
1499    }
1500
1501    fn worker_run_data_stream_with_sink(
1502        &mut self,
1503        export: &str,
1504        request: &serde_json::Value,
1505        rows: LeanWorkerDataSinkTarget<'_>,
1506        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1507        cancellation: Option<&LeanWorkerCancellationToken>,
1508        progress: Option<&dyn LeanWorkerProgressSink>,
1509    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1510        const OPERATION: &str = "worker_run_data_stream";
1511        check_cancelled(OPERATION, cancellation)?;
1512        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1513            message: format!("worker data-stream request JSON encode failed: {err}"),
1514        })?;
1515        self.prepare_request(false)?;
1516        self.send_request(Request::RunDataStream {
1517            export: export.to_owned(),
1518            request_json,
1519            progress: progress.is_some(),
1520        })?;
1521        self.record_request(false);
1522        self.stats.stream_requests = self.stats.stream_requests.saturating_add(1);
1523        match self.read_response_with_events(OPERATION, progress, cancellation, Some(rows), diagnostics)? {
1524            Response::StreamComplete { summary } => Ok(summary.into()),
1525            Response::StreamExportFailed { status_byte } => {
1526                Err(LeanWorkerError::StreamExportFailed { status: status_byte })
1527            }
1528            Response::StreamCallbackFailed {
1529                status_byte,
1530                description,
1531            } => Err(LeanWorkerError::StreamCallbackFailed {
1532                status: status_byte,
1533                description,
1534            }),
1535            Response::StreamRowMalformed { message } => Err(LeanWorkerError::StreamRowMalformed { message }),
1536            other @ (Response::HealthOk
1537            | Response::CapabilityLoaded
1538            | Response::U64 { .. }
1539            | Response::HostSessionOpened
1540            | Response::Elaboration { .. }
1541            | Response::KernelCheck { .. }
1542            | Response::Strings { .. }
1543            | Response::RowsComplete { .. }
1544            | Response::CapabilityMetadata { .. }
1545            | Response::CapabilityDoctor { .. }
1546            | Response::CapabilityMetadataMalformed { .. }
1547            | Response::CapabilityDoctorMalformed { .. }
1548            | Response::JsonCommand { .. }
1549            | Response::MetaExpr { .. }
1550            | Response::MetaBool { .. }
1551            | Response::Declaration { .. }
1552            | Response::DeclarationBulk { .. }
1553            | Response::ProcessFile { .. }
1554            | Response::ProcessModule { .. }
1555            | Response::Terminating
1556            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1557        }
1558    }
1559
1560    #[expect(
1561        clippy::wildcard_enum_match_arm,
1562        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1563    )]
1564    pub(crate) fn worker_capability_metadata(
1565        &mut self,
1566        export: &str,
1567        request: &serde_json::Value,
1568        cancellation: Option<&LeanWorkerCancellationToken>,
1569        progress: Option<&dyn LeanWorkerProgressSink>,
1570    ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
1571        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1572            message: format!("worker capability metadata request JSON encode failed: {err}"),
1573        })?;
1574        self.round_trip(
1575            "worker_capability_metadata",
1576            Request::CapabilityMetadata {
1577                export: export.to_owned(),
1578                request_json,
1579            },
1580            false,
1581            cancellation,
1582            progress,
1583            |response, operation| match response {
1584                Response::CapabilityMetadata { metadata } => Ok(metadata),
1585                Response::CapabilityMetadataMalformed { message } => {
1586                    Err(LeanWorkerError::CapabilityMetadataMalformed { message })
1587                }
1588                other => Err(unexpected_response(operation, &other)),
1589            },
1590        )
1591    }
1592
1593    #[expect(
1594        clippy::wildcard_enum_match_arm,
1595        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1596    )]
1597    pub(crate) fn worker_capability_doctor(
1598        &mut self,
1599        export: &str,
1600        request: &serde_json::Value,
1601        cancellation: Option<&LeanWorkerCancellationToken>,
1602        progress: Option<&dyn LeanWorkerProgressSink>,
1603    ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
1604        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1605            message: format!("worker capability doctor request JSON encode failed: {err}"),
1606        })?;
1607        self.round_trip(
1608            "worker_capability_doctor",
1609            Request::CapabilityDoctor {
1610                export: export.to_owned(),
1611                request_json,
1612            },
1613            false,
1614            cancellation,
1615            progress,
1616            |response, operation| match response {
1617                Response::CapabilityDoctor { report } => Ok(report),
1618                Response::CapabilityDoctorMalformed { message } => {
1619                    Err(LeanWorkerError::CapabilityDoctorMalformed { message })
1620                }
1621                other => Err(unexpected_response(operation, &other)),
1622            },
1623        )
1624    }
1625
1626    #[expect(
1627        clippy::wildcard_enum_match_arm,
1628        reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1629    )]
1630    pub(crate) fn worker_json_command(
1631        &mut self,
1632        export: &str,
1633        request_json: String,
1634        cancellation: Option<&LeanWorkerCancellationToken>,
1635        progress: Option<&dyn LeanWorkerProgressSink>,
1636    ) -> Result<String, LeanWorkerError> {
1637        self.round_trip(
1638            "worker_json_command",
1639            Request::JsonCommand {
1640                export: export.to_owned(),
1641                request_json,
1642            },
1643            false,
1644            cancellation,
1645            progress,
1646            |response, operation| match response {
1647                Response::JsonCommand { response_json } => Ok(response_json),
1648                other => Err(unexpected_response(operation, &other)),
1649            },
1650        )
1651    }
1652
1653    fn send_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
1654        self.ensure_running()?;
1655        let Some(stdin) = self.stdin.as_mut() else {
1656            return Err(self.dead_error());
1657        };
1658        write_frame(stdin, Message::Request(request)).map_err(|err| LeanWorkerError::Protocol {
1659            message: err.to_string(),
1660        })
1661    }
1662
1663    fn prepare_request(&mut self, import_like: bool) -> Result<(), LeanWorkerError> {
1664        self.ensure_running()?;
1665
1666        if let Some(limit) = self.config.restart_policy.max_requests
1667            && self.requests_since_restart >= limit
1668        {
1669            return self.restart_with_reason(LeanWorkerRestartReason::MaxRequests { limit });
1670        }
1671
1672        if import_like
1673            && let Some(limit) = self.config.restart_policy.max_imports
1674            && self.imports_since_restart >= limit
1675        {
1676            return self.restart_with_reason(LeanWorkerRestartReason::MaxImports { limit });
1677        }
1678
1679        if let Some(limit_kib) = self.config.restart_policy.max_rss_kib {
1680            match self.child_rss_kib() {
1681                Some(current_kib) if current_kib >= limit_kib => {
1682                    self.stats.last_rss_kib = Some(current_kib);
1683                    return self.restart_with_reason(LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib });
1684                }
1685                Some(current_kib) => {
1686                    self.stats.last_rss_kib = Some(current_kib);
1687                }
1688                None => {
1689                    self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
1690                }
1691            }
1692        }
1693
1694        if let Some(limit) = self.config.restart_policy.idle_restart_after {
1695            let idle_for = self.last_activity.elapsed();
1696            if idle_for >= limit {
1697                return self.restart_with_reason(LeanWorkerRestartReason::Idle { idle_for, limit });
1698            }
1699        }
1700
1701        Ok(())
1702    }
1703
1704    fn record_request(&mut self, import_like: bool) {
1705        self.stats.requests = self.stats.requests.saturating_add(1);
1706        self.requests_since_restart = self.requests_since_restart.saturating_add(1);
1707        if import_like {
1708            self.stats.imports = self.stats.imports.saturating_add(1);
1709            self.imports_since_restart = self.imports_since_restart.saturating_add(1);
1710        }
1711        self.last_activity = Instant::now();
1712    }
1713
1714    fn restart_with_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
1715        let config = self.config.clone();
1716        self.stop_existing_child()?;
1717        self.stats.record_restart(reason);
1718        self.requests_since_restart = 0;
1719        self.imports_since_restart = 0;
1720        let mut next = Self::spawn(&config)?;
1721        next.stats = self.stats.clone();
1722        next.last_activity = Instant::now();
1723        *self = next;
1724        Ok(())
1725    }
1726
1727    fn read_response(&mut self, operation: &'static str) -> Result<Response, LeanWorkerError> {
1728        self.read_response_with_events(operation, None, None, None, None)
1729    }
1730
1731    fn read_response_with_progress(
1732        &mut self,
1733        operation: &'static str,
1734        progress: Option<&dyn LeanWorkerProgressSink>,
1735        cancellation: Option<&LeanWorkerCancellationToken>,
1736    ) -> Result<Response, LeanWorkerError> {
1737        self.read_response_with_events(operation, progress, cancellation, None, None)
1738    }
1739
1740    /// Run one Request/Response round-trip and project the response into a
1741    /// typed value.
1742    ///
1743    /// Centralizes the cancel-check → send → record → read sequence so every
1744    /// `worker_*` helper above delegates here instead of repeating five
1745    /// identical lines plus a 22-variant wildcard arm. The `extract` closure
1746    /// receives the response together with the operation name; it returns the
1747    /// typed value, the typed error variant the protocol expects (e.g.,
1748    /// `*Malformed`), or `unexpected_response(operation, &other)` for any
1749    /// wire variant the operation never expects.
1750    fn round_trip<R>(
1751        &mut self,
1752        operation: &'static str,
1753        request: Request,
1754        import_like: bool,
1755        cancellation: Option<&LeanWorkerCancellationToken>,
1756        progress: Option<&dyn LeanWorkerProgressSink>,
1757        extract: impl FnOnce(Response, &'static str) -> Result<R, LeanWorkerError>,
1758    ) -> Result<R, LeanWorkerError> {
1759        check_cancelled(operation, cancellation)?;
1760        self.prepare_request(import_like)?;
1761        self.send_request(request)?;
1762        self.record_request(import_like);
1763        let response = self.read_response_with_progress(operation, progress, cancellation)?;
1764        extract(response, operation)
1765    }
1766
1767    fn read_response_with_events(
1768        &mut self,
1769        operation: &'static str,
1770        progress: Option<&dyn LeanWorkerProgressSink>,
1771        cancellation: Option<&LeanWorkerCancellationToken>,
1772        data: Option<LeanWorkerDataSinkTarget<'_>>,
1773        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1774    ) -> Result<Response, LeanWorkerError> {
1775        let started = Instant::now();
1776        let timeout = self.config.request_timeout;
1777        let deadline = started.checked_add(timeout);
1778        let streaming = data.is_some();
1779        let mut request_backpressure_waits = 0_u64;
1780        let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
1781        let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
1782        let _reader = thread::spawn(move || read_request_messages(stdout, sender));
1783
1784        loop {
1785            let event = match deadline.and_then(|deadline| deadline.checked_duration_since(Instant::now())) {
1786                Some(remaining) if remaining.is_zero() => {
1787                    if streaming {
1788                        self.record_stream_failure(started, request_backpressure_waits);
1789                    }
1790                    self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1791                        operation,
1792                        duration: timeout,
1793                    })?;
1794                    return Err(LeanWorkerError::Timeout {
1795                        operation,
1796                        duration: timeout,
1797                    });
1798                }
1799                Some(remaining) => match receiver.recv_timeout(remaining) {
1800                    Ok(event) => event,
1801                    Err(mpsc::RecvTimeoutError::Timeout) => {
1802                        if streaming {
1803                            self.record_stream_failure(started, request_backpressure_waits);
1804                        }
1805                        self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1806                            operation,
1807                            duration: timeout,
1808                        })?;
1809                        return Err(LeanWorkerError::Timeout {
1810                            operation,
1811                            duration: timeout,
1812                        });
1813                    }
1814                    Err(mpsc::RecvTimeoutError::Disconnected) => {
1815                        return Err(LeanWorkerError::Protocol {
1816                            message: "worker response reader exited without a terminal response".to_owned(),
1817                        });
1818                    }
1819                },
1820                None => match receiver.recv() {
1821                    Ok(event) => event,
1822                    Err(_err) => {
1823                        return Err(LeanWorkerError::Protocol {
1824                            message: "worker response reader exited without a terminal response".to_owned(),
1825                        });
1826                    }
1827                },
1828            };
1829            request_backpressure_waits = request_backpressure_waits.saturating_add(event.backpressure_waits());
1830            self.stats.backpressure_waits = self.stats.backpressure_waits.saturating_add(event.backpressure_waits());
1831
1832            let message = match event {
1833                RequestReaderEvent::Message { message, .. } => message,
1834                RequestReaderEvent::Terminal { message, stdout, .. } => {
1835                    self.stdout = Some(stdout);
1836                    match message {
1837                        Message::Response(Response::Error { code, message }) => {
1838                            if streaming {
1839                                self.record_stream_failure(started, request_backpressure_waits);
1840                            }
1841                            return Err(LeanWorkerError::Worker { code, message });
1842                        }
1843                        Message::Response(response) => {
1844                            if streaming {
1845                                if matches!(response, Response::StreamComplete { .. }) {
1846                                    self.record_stream_success(started);
1847                                } else {
1848                                    self.record_stream_failure(started, request_backpressure_waits);
1849                                }
1850                            }
1851                            return Ok(response);
1852                        }
1853                        other @ (Message::Handshake { .. }
1854                        | Message::Request(_)
1855                        | Message::Diagnostic(_)
1856                        | Message::ProgressTick(_)
1857                        | Message::DataRow(_)
1858                        | Message::FatalExit(_)) => {
1859                            return Err(LeanWorkerError::Protocol {
1860                                message: format!("worker sent unexpected {operation} message: {other:?}"),
1861                            });
1862                        }
1863                    }
1864                }
1865                RequestReaderEvent::ReadError { message, eof, .. } => {
1866                    if streaming {
1867                        self.record_stream_failure(started, request_backpressure_waits);
1868                    }
1869                    return if eof {
1870                        Err(self.record_exit_error())
1871                    } else {
1872                        Err(LeanWorkerError::Protocol { message })
1873                    };
1874                }
1875            };
1876
1877            match message {
1878                Message::ProgressTick(tick) => {
1879                    if let Err(err) =
1880                        report_parent_progress(progress, elapsed_event(tick.phase, tick.current, tick.total, started))
1881                    {
1882                        if streaming {
1883                            self.record_stream_failure(started, request_backpressure_waits);
1884                        }
1885                        return Err(err);
1886                    }
1887                    if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1888                        if streaming {
1889                            self.record_stream_failure(started, request_backpressure_waits);
1890                        }
1891                        self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1892                        return Err(LeanWorkerError::Cancelled { operation });
1893                    }
1894                }
1895                Message::DataRow(row) => {
1896                    let payload_bytes = row.payload.get().len() as u64;
1897                    if let Err(err) = report_parent_data_row(data, row) {
1898                        if streaming {
1899                            self.record_stream_failure(started, request_backpressure_waits);
1900                        }
1901                        return Err(err);
1902                    }
1903                    self.stats.data_rows_delivered = self.stats.data_rows_delivered.saturating_add(1);
1904                    self.stats.data_row_payload_bytes = self.stats.data_row_payload_bytes.saturating_add(payload_bytes);
1905                    if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1906                        if streaming {
1907                            self.record_stream_failure(started, request_backpressure_waits);
1908                        }
1909                        self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1910                        return Err(LeanWorkerError::Cancelled { operation });
1911                    }
1912                }
1913                Message::Diagnostic(diagnostic) => {
1914                    if let Err(err) = report_parent_diagnostic(diagnostics, diagnostic.into()) {
1915                        if streaming {
1916                            self.record_stream_failure(started, request_backpressure_waits);
1917                        }
1918                        return Err(err);
1919                    }
1920                }
1921                Message::Response(response) => return Err(unexpected_response(operation, &response)),
1922                other @ (Message::Handshake { .. } | Message::Request(_) | Message::FatalExit(_)) => {
1923                    return Err(LeanWorkerError::Protocol {
1924                        message: format!("worker sent unexpected {operation} message: {other:?}"),
1925                    });
1926                }
1927            }
1928        }
1929    }
1930
1931    fn ensure_running(&mut self) -> Result<(), LeanWorkerError> {
1932        match self.status()? {
1933            LeanWorkerStatus::Running => Ok(()),
1934            LeanWorkerStatus::Exited(exit) if exit.success => Err(LeanWorkerError::ChildExited { exit }),
1935            LeanWorkerStatus::Exited(exit) => Err(LeanWorkerError::ChildPanicOrAbort { exit }),
1936        }
1937    }
1938
1939    fn record_stream_success(&mut self, started: Instant) {
1940        self.stats.stream_successes = self.stats.stream_successes.saturating_add(1);
1941        self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1942    }
1943
1944    fn record_stream_failure(&mut self, started: Instant, backpressure_waits: u64) {
1945        self.stats.stream_failures = self.stats.stream_failures.saturating_add(1);
1946        self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1947        if backpressure_waits > 0 {
1948            self.stats.backpressure_failures = self.stats.backpressure_failures.saturating_add(1);
1949        }
1950    }
1951
1952    fn wait_for_exit(&mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
1953        let Some(child) = self.child.as_mut() else {
1954            return Err(self.dead_error());
1955        };
1956        let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1957        let diagnostics = self.read_stderr();
1958        let exit = LeanWorkerExit::from_status(status, diagnostics);
1959        self.last_exit = Some(exit.clone());
1960        self.child = None;
1961        self.stdin = None;
1962        self.stdout = None;
1963        self.stats.exits = self.stats.exits.saturating_add(1);
1964        Ok(exit)
1965    }
1966
1967    fn try_record_exit(&mut self) -> Option<LeanWorkerExit> {
1968        let child = self.child.as_mut()?;
1969        let status = child.try_wait().ok().flatten()?;
1970        let diagnostics = self.read_stderr();
1971        let exit = LeanWorkerExit::from_status(status, diagnostics);
1972        self.last_exit = Some(exit.clone());
1973        self.child = None;
1974        self.stdin = None;
1975        self.stdout = None;
1976        self.stats.exits = self.stats.exits.saturating_add(1);
1977        Some(exit)
1978    }
1979
1980    fn record_exit_error(&mut self) -> LeanWorkerError {
1981        match self.wait_for_exit() {
1982            Ok(exit) if exit.success => LeanWorkerError::ChildExited { exit },
1983            Ok(exit) => LeanWorkerError::ChildPanicOrAbort { exit },
1984            Err(err) => err,
1985        }
1986    }
1987
1988    fn stop_existing_child(&mut self) -> Result<(), LeanWorkerError> {
1989        if let Some(child) = self.child.as_mut() {
1990            drop(child.kill());
1991            let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1992            let diagnostics = self.read_stderr();
1993            self.last_exit = Some(LeanWorkerExit::from_status(status, diagnostics));
1994            self.stats.exits = self.stats.exits.saturating_add(1);
1995        }
1996        self.child = None;
1997        self.stdin = None;
1998        self.stdout = None;
1999        Ok(())
2000    }
2001
2002    fn dead_error(&self) -> LeanWorkerError {
2003        let exit = self.last_exit.clone().unwrap_or_else(|| LeanWorkerExit {
2004            success: false,
2005            code: None,
2006            status: "worker is not running".to_owned(),
2007            diagnostics: String::new(),
2008        });
2009        if exit.success {
2010            LeanWorkerError::ChildExited { exit }
2011        } else {
2012            LeanWorkerError::ChildPanicOrAbort { exit }
2013        }
2014    }
2015
2016    fn read_stderr(&mut self) -> String {
2017        let mut diagnostics = String::new();
2018        if let Some(mut pipe) = self.stderr.take() {
2019            drop(pipe.read_to_string(&mut diagnostics));
2020        }
2021        diagnostics
2022    }
2023
2024    fn child_rss_kib(&mut self) -> Option<u64> {
2025        let child = self.child.as_mut()?;
2026        child_rss_kib(child.id())
2027    }
2028}
2029
2030enum RequestReaderEvent {
2031    Message {
2032        message: Message,
2033        backpressure_waits: u64,
2034    },
2035    Terminal {
2036        message: Message,
2037        stdout: BufReader<ChildStdout>,
2038        backpressure_waits: u64,
2039    },
2040    ReadError {
2041        message: String,
2042        eof: bool,
2043        backpressure_waits: u64,
2044    },
2045}
2046
2047impl RequestReaderEvent {
2048    fn backpressure_waits(&self) -> u64 {
2049        match self {
2050            Self::Message { backpressure_waits, .. }
2051            | Self::Terminal { backpressure_waits, .. }
2052            | Self::ReadError { backpressure_waits, .. } => *backpressure_waits,
2053        }
2054    }
2055
2056    fn add_backpressure_wait(&mut self) {
2057        match self {
2058            Self::Message { backpressure_waits, .. }
2059            | Self::Terminal { backpressure_waits, .. }
2060            | Self::ReadError { backpressure_waits, .. } => {
2061                *backpressure_waits = backpressure_waits.saturating_add(1);
2062            }
2063        }
2064    }
2065}
2066
2067#[allow(
2068    clippy::needless_pass_by_value,
2069    reason = "the request reader thread must own the sender"
2070)]
2071fn read_request_messages(mut stdout: BufReader<ChildStdout>, sender: mpsc::SyncSender<RequestReaderEvent>) {
2072    loop {
2073        match read_frame(&mut stdout) {
2074            Ok(frame) if matches!(frame.message, Message::Response(_)) => {
2075                let _ = send_reader_event(
2076                    &sender,
2077                    RequestReaderEvent::Terminal {
2078                        message: frame.message,
2079                        stdout,
2080                        backpressure_waits: 0,
2081                    },
2082                );
2083                return;
2084            }
2085            Ok(frame) => {
2086                if send_reader_event(
2087                    &sender,
2088                    RequestReaderEvent::Message {
2089                        message: frame.message,
2090                        backpressure_waits: 0,
2091                    },
2092                )
2093                .is_err()
2094                {
2095                    return;
2096                }
2097            }
2098            Err(err) => {
2099                let _ = send_reader_event(
2100                    &sender,
2101                    RequestReaderEvent::ReadError {
2102                        message: err.to_string(),
2103                        eof: err.is_eof(),
2104                        backpressure_waits: 0,
2105                    },
2106                );
2107                return;
2108            }
2109        }
2110    }
2111}
2112
2113fn send_reader_event(sender: &mpsc::SyncSender<RequestReaderEvent>, event: RequestReaderEvent) -> Result<(), ()> {
2114    match sender.try_send(event) {
2115        Ok(()) => Ok(()),
2116        Err(mpsc::TrySendError::Full(mut event)) => {
2117            event.add_backpressure_wait();
2118            sender.send(event).map_err(|_| ())
2119        }
2120        Err(mpsc::TrySendError::Disconnected(_event)) => Err(()),
2121    }
2122}
2123
2124impl Drop for LeanWorker {
2125    fn drop(&mut self) {
2126        if let Some(child) = self.child.as_mut() {
2127            drop(child.kill());
2128            drop(child.wait());
2129        }
2130    }
2131}
2132
2133fn expect_handshake(stdout: &mut BufReader<ChildStdout>) -> Result<LeanWorkerRuntimeMetadata, LeanWorkerError> {
2134    let frame = read_frame(stdout).map_err(|err| {
2135        if err.is_eof() {
2136            LeanWorkerError::Handshake {
2137                message: "child closed stdout before handshake".to_owned(),
2138            }
2139        } else {
2140            LeanWorkerError::Handshake {
2141                message: err.to_string(),
2142            }
2143        }
2144    })?;
2145    match frame.message {
2146        Message::Handshake {
2147            worker_version,
2148            protocol_version,
2149        } if protocol_version == crate::protocol::PROTOCOL_VERSION => Ok(LeanWorkerRuntimeMetadata {
2150            worker_version,
2151            protocol_version,
2152            lean_version: None,
2153        }),
2154        other @ (Message::Handshake { .. }
2155        | Message::Request(_)
2156        | Message::Response(_)
2157        | Message::Diagnostic(_)
2158        | Message::ProgressTick(_)
2159        | Message::DataRow(_)
2160        | Message::FatalExit(_)) => Err(LeanWorkerError::Handshake {
2161            message: format!("unexpected handshake frame: {other:?}"),
2162        }),
2163    }
2164}
2165
2166fn wait_with_stderr(child: &mut Child, stderr: Option<ChildStderr>) -> Result<LeanWorkerExit, LeanWorkerError> {
2167    let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
2168    let mut diagnostics = String::new();
2169    if let Some(mut pipe) = stderr {
2170        drop(pipe.read_to_string(&mut diagnostics));
2171    }
2172    Ok(LeanWorkerExit::from_status(status, diagnostics))
2173}
2174
2175fn unexpected_response(operation: &'static str, response: &Response) -> LeanWorkerError {
2176    LeanWorkerError::Protocol {
2177        message: format!("worker sent unexpected {operation} response: {response:?}"),
2178    }
2179}
2180
2181fn path_string(path: &Path) -> String {
2182    path.to_string_lossy().into_owned()
2183}
2184
2185/// Parent-side collector that decodes per-row JSON-string payloads from
2186/// `list_declarations_strings` into an owned `Vec<String>`.
2187///
2188/// Each name lands as its own `Message::DataRow` so the 1 MiB protocol frame
2189/// cap binds per-name (any single Lean name is well under that) rather than
2190/// per-response. Decoding failures or sink panics are surfaced through the
2191/// usual `LeanWorkerError::Protocol` / `DataSinkPanic` paths.
2192#[derive(Debug, Default)]
2193struct DeclarationNameCollector {
2194    names: Mutex<Vec<String>>,
2195    decode_error: Mutex<Option<String>>,
2196}
2197
2198impl DeclarationNameCollector {
2199    fn into_inner(self) -> Vec<String> {
2200        self.names.into_inner().unwrap_or_default()
2201    }
2202}
2203
2204impl LeanWorkerRawDataSink for DeclarationNameCollector {
2205    fn report(&self, row: LeanWorkerRawDataRow) {
2206        match serde_json::from_str::<String>(row.payload.get()) {
2207            Ok(name) => {
2208                if let Ok(mut guard) = self.names.lock() {
2209                    guard.push(name);
2210                }
2211            }
2212            Err(err) => {
2213                if let Ok(mut slot) = self.decode_error.lock()
2214                    && slot.is_none()
2215                {
2216                    *slot = Some(format!("list_declarations_strings row payload decode failed: {err}"));
2217                }
2218            }
2219        }
2220    }
2221}
2222
2223#[cfg(target_os = "linux")]
2224fn child_rss_kib(pid: u32) -> Option<u64> {
2225    let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
2226    status.lines().find_map(|line| {
2227        let rest = line.strip_prefix("VmRSS:")?;
2228        rest.split_whitespace().next()?.parse::<u64>().ok()
2229    })
2230}
2231
2232#[cfg(not(target_os = "linux"))]
2233fn child_rss_kib(pid: u32) -> Option<u64> {
2234    let output = Command::new("ps")
2235        .args(["-o", "rss=", "-p", &pid.to_string()])
2236        .output()
2237        .ok()?;
2238    if !output.status.success() {
2239        return None;
2240    }
2241    let text = String::from_utf8_lossy(&output.stdout);
2242    text.trim().parse::<u64>().ok().filter(|value| *value > 0)
2243}