Skip to main content

lean_rs_worker/
supervisor.rs

1use std::ffi::OsString;
2use std::fmt;
3use std::io::{BufReader, BufWriter, Read as _};
4use std::path::{Path, PathBuf};
5use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
6use std::sync::mpsc;
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::capability::LeanWorkerBootstrapDiagnosticCode;
11use crate::protocol::{Message, Request, Response, read_frame, write_frame};
12use crate::session::LeanWorkerDataSinkTarget;
13use crate::session::{
14    LeanWorkerCancellationToken, LeanWorkerCapabilityMetadata, LeanWorkerDataSink, LeanWorkerDiagnosticSink,
15    LeanWorkerDoctorReport, LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerKernelResult,
16    LeanWorkerProgressSink, LeanWorkerRawDataSink, LeanWorkerRuntimeMetadata, LeanWorkerSessionConfig,
17    LeanWorkerStreamSummary, check_cancelled, elapsed_event, report_parent_data_row, report_parent_diagnostic,
18    report_parent_progress,
19};
20
21const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
22const WORKER_EVENT_BUFFER_CAPACITY: usize = 64;
23
24/// Default deadline for one worker request after startup.
25pub const LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(30);
26
27/// Suggested deadline for long-running worker requests.
28pub const LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING: Duration = Duration::from_mins(10);
29
30/// Configuration for starting a `lean-rs-worker` child process.
31///
32/// The executable should be the `lean-rs-worker-child` binary. The supervisor
33/// sets `LEAN_ABORT_ON_PANIC=1` by default so Lean internal panics become fatal
34/// child exits instead of attempting in-process recovery; explicit environment
35/// entries supplied here override that default.
36#[derive(Clone, Debug)]
37pub struct LeanWorkerConfig {
38    executable: PathBuf,
39    current_dir: Option<PathBuf>,
40    env: Vec<(OsString, OsString)>,
41    startup_timeout: Duration,
42    request_timeout: Duration,
43    restart_policy: LeanWorkerRestartPolicy,
44}
45
46impl LeanWorkerConfig {
47    /// Create a worker configuration for a child executable.
48    pub fn new(executable: impl Into<PathBuf>) -> Self {
49        Self {
50            executable: executable.into(),
51            current_dir: None,
52            env: Vec::new(),
53            startup_timeout: DEFAULT_STARTUP_TIMEOUT,
54            request_timeout: LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT,
55            restart_policy: LeanWorkerRestartPolicy::default(),
56        }
57    }
58
59    /// Return the child executable path.
60    pub fn executable(&self) -> &Path {
61        &self.executable
62    }
63
64    /// Set the child working directory.
65    #[must_use]
66    pub fn current_dir(mut self, path: impl Into<PathBuf>) -> Self {
67        self.current_dir = Some(path.into());
68        self
69    }
70
71    /// Add or override one child environment variable.
72    #[must_use]
73    pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
74        self.env.push((key.into(), value.into()));
75        self
76    }
77
78    /// Set the maximum time to wait for the child handshake.
79    #[must_use]
80    pub fn startup_timeout(mut self, timeout: Duration) -> Self {
81        self.startup_timeout = timeout;
82        self
83    }
84
85    /// Set the maximum time to wait for one request's terminal response.
86    ///
87    /// The request timeout starts after the request frame is written. It covers
88    /// live rows, diagnostics, progress events, and the terminal response. On
89    /// timeout, the supervisor kills and replaces the child process.
90    #[must_use]
91    pub fn request_timeout(mut self, timeout: Duration) -> Self {
92        self.request_timeout = timeout;
93        self
94    }
95
96    /// Use the documented long-running request timeout profile.
97    #[must_use]
98    pub fn long_running_requests(mut self) -> Self {
99        self.request_timeout = LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING;
100        self
101    }
102
103    /// Set the worker restart policy.
104    ///
105    /// Policy checks run before requests enter the child. A policy restart is a
106    /// process restart; it is the only supported reset for Lean process-global
107    /// runtime and import state.
108    #[must_use]
109    pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
110        self.restart_policy = policy;
111        self
112    }
113}
114
115/// Policy for cycling a worker child before the next request.
116///
117/// The policy resets retained Lean runtime memory only by restarting the
118/// process. It does not change `lean-rs-host`'s in-process memory model, and it
119/// does not imply that `SessionPool::drain()` can return process-global Lean
120/// memory to the OS.
121#[derive(Clone, Debug, Default, Eq, PartialEq)]
122pub struct LeanWorkerRestartPolicy {
123    max_requests: Option<u64>,
124    max_imports: Option<u64>,
125    max_rss_kib: Option<u64>,
126    idle_restart_after: Option<Duration>,
127}
128
129impl LeanWorkerRestartPolicy {
130    /// Disable automatic policy restarts.
131    #[must_use]
132    pub fn disabled() -> Self {
133        Self::default()
134    }
135
136    /// Restart before a request when this many requests have entered the child.
137    #[must_use]
138    pub fn max_requests(mut self, limit: u64) -> Self {
139        self.max_requests = Some(limit.max(1));
140        self
141    }
142
143    /// Restart before an import-like request when this many imports have run.
144    #[must_use]
145    pub fn max_imports(mut self, limit: u64) -> Self {
146        self.max_imports = Some(limit.max(1));
147        self
148    }
149
150    /// Restart before a request when measured child RSS is at least this many KiB.
151    ///
152    /// RSS measurement is best effort. It is implemented for the current
153    /// supported Unix development targets; unsupported platforms skip the
154    /// check and increment `LeanWorkerStats::rss_samples_unavailable`.
155    #[must_use]
156    pub fn max_rss_kib(mut self, limit: u64) -> Self {
157        self.max_rss_kib = Some(limit.max(1));
158        self
159    }
160
161    /// Restart before a request when the worker has been idle for this long.
162    #[must_use]
163    pub fn idle_restart_after(mut self, duration: Duration) -> Self {
164        self.idle_restart_after = Some(duration);
165        self
166    }
167}
168
169/// Reason recorded for the latest worker cycle.
170#[derive(Clone, Debug, Eq, PartialEq)]
171pub enum LeanWorkerRestartReason {
172    /// The caller explicitly requested a process cycle.
173    Explicit,
174    /// Request count reached the configured limit before the next request.
175    MaxRequests { limit: u64 },
176    /// Import-like request count reached the configured limit before the next import.
177    MaxImports { limit: u64 },
178    /// Child resident set size reached the configured limit.
179    RssCeiling { current_kib: u64, limit_kib: u64 },
180    /// Worker was idle at least as long as the configured limit.
181    Idle { idle_for: Duration, limit: Duration },
182    /// Parent-side cancellation replaced the child during an in-flight request.
183    Cancelled { operation: &'static str },
184    /// Parent-side request timeout replaced the child during an in-flight request.
185    RequestTimeout {
186        operation: &'static str,
187        duration: Duration,
188    },
189}
190
191/// Snapshot of worker lifecycle counters.
192#[derive(Clone, Debug, Default, Eq, PartialEq)]
193pub struct LeanWorkerStats {
194    /// Requests that entered a worker child.
195    pub requests: u64,
196    /// Import-like requests that entered a worker child.
197    pub imports: u64,
198    /// Child exits observed by the supervisor, including policy cycles.
199    pub exits: u64,
200    /// Policy or explicit restarts performed by the supervisor.
201    pub restarts: u64,
202    /// Explicit process cycles.
203    pub explicit_cycles: u64,
204    /// Restarts caused by `LeanWorkerRestartPolicy::max_requests`.
205    pub max_request_restarts: u64,
206    /// Restarts caused by `LeanWorkerRestartPolicy::max_imports`.
207    pub max_import_restarts: u64,
208    /// Restarts caused by `LeanWorkerRestartPolicy::max_rss_kib`.
209    pub rss_restarts: u64,
210    /// Restarts caused by `LeanWorkerRestartPolicy::idle_restart_after`.
211    pub idle_restarts: u64,
212    /// Restarts caused by parent-side cancellation of an in-flight request.
213    pub cancelled_restarts: u64,
214    /// Restarts caused by parent-side request timeouts.
215    pub timeout_restarts: u64,
216    /// RSS checks skipped because the platform did not provide a usable sample.
217    pub rss_samples_unavailable: u64,
218    /// Last measured child RSS in KiB, when a policy check could sample it.
219    pub last_rss_kib: Option<u64>,
220    /// Most recent restart reason, if any.
221    pub last_restart_reason: Option<LeanWorkerRestartReason>,
222    /// Streaming requests that entered a worker child.
223    pub stream_requests: u64,
224    /// Streaming requests that reached terminal success.
225    pub stream_successes: u64,
226    /// Streaming requests that failed after entering the child.
227    pub stream_failures: u64,
228    /// Data rows delivered to parent-side sinks.
229    pub data_rows_delivered: u64,
230    /// Raw row payload bytes delivered to parent-side sinks.
231    pub data_row_payload_bytes: u64,
232    /// Total elapsed time spent in streaming requests.
233    pub stream_elapsed: Duration,
234    /// Times the bounded worker-event reader had to wait for the parent to drain events.
235    pub backpressure_waits: u64,
236    /// Streaming requests that failed after bounded-buffer backpressure was observed.
237    pub backpressure_failures: u64,
238}
239
240impl LeanWorkerStats {
241    fn record_restart(&mut self, reason: LeanWorkerRestartReason) {
242        self.restarts = self.restarts.saturating_add(1);
243        match &reason {
244            LeanWorkerRestartReason::Explicit => {
245                self.explicit_cycles = self.explicit_cycles.saturating_add(1);
246            }
247            LeanWorkerRestartReason::MaxRequests { .. } => {
248                self.max_request_restarts = self.max_request_restarts.saturating_add(1);
249            }
250            LeanWorkerRestartReason::MaxImports { .. } => {
251                self.max_import_restarts = self.max_import_restarts.saturating_add(1);
252            }
253            LeanWorkerRestartReason::RssCeiling { .. } => {
254                self.rss_restarts = self.rss_restarts.saturating_add(1);
255            }
256            LeanWorkerRestartReason::Idle { .. } => {
257                self.idle_restarts = self.idle_restarts.saturating_add(1);
258            }
259            LeanWorkerRestartReason::Cancelled { .. } => {
260                self.cancelled_restarts = self.cancelled_restarts.saturating_add(1);
261            }
262            LeanWorkerRestartReason::RequestTimeout { .. } => {
263                self.timeout_restarts = self.timeout_restarts.saturating_add(1);
264            }
265        }
266        self.last_restart_reason = Some(reason);
267    }
268}
269
270/// Public lifecycle state for a worker child.
271#[derive(Clone, Debug, Eq, PartialEq)]
272pub enum LeanWorkerStatus {
273    /// The worker process is still running.
274    Running,
275    /// The worker process has exited.
276    Exited(LeanWorkerExit),
277}
278
279/// Rendered child-process exit information.
280#[derive(Clone, Debug, Eq, PartialEq)]
281pub struct LeanWorkerExit {
282    /// Whether the child process exited successfully.
283    pub success: bool,
284    /// The platform exit code when one is available.
285    pub code: Option<i32>,
286    /// The platform-rendered process status.
287    pub status: String,
288    /// Captured child diagnostics, if available.
289    pub diagnostics: String,
290}
291
292impl LeanWorkerExit {
293    fn from_status(status: ExitStatus, diagnostics: String) -> Self {
294        Self {
295            success: status.success(),
296            code: status.code(),
297            status: status.to_string(),
298            diagnostics,
299        }
300    }
301}
302
303/// Errors reported by the worker supervisor.
304#[derive(Debug)]
305pub enum LeanWorkerError {
306    /// The worker child could not be spawned.
307    Spawn {
308        executable: PathBuf,
309        source: std::io::Error,
310    },
311    /// The default worker child executable could not be resolved.
312    WorkerChildUnresolved {
313        /// Candidate paths checked by the default resolver.
314        tried: Vec<PathBuf>,
315    },
316    /// The resolved worker child is missing or is not executable.
317    WorkerChildNotExecutable { path: PathBuf, reason: String },
318    /// Worker bootstrap preflight failed before a real command ran.
319    Bootstrap {
320        code: LeanWorkerBootstrapDiagnosticCode,
321        message: String,
322    },
323    /// The capability Lake target could not be built.
324    CapabilityBuild {
325        /// Typed Lake/toolchain diagnostic from `lean-toolchain`.
326        diagnostic: lean_toolchain::LinkDiagnostics,
327    },
328    /// The child process could not be prepared after spawning.
329    Setup { message: String },
330    /// The child did not complete the startup handshake.
331    Handshake { message: String },
332    /// The worker protocol failed after the handshake.
333    Protocol { message: String },
334    /// The child returned a typed worker error.
335    Worker { code: String, message: String },
336    /// The child exited while a request was in flight.
337    ChildExited { exit: LeanWorkerExit },
338    /// The child exited fatally while a request was in flight.
339    ChildPanicOrAbort { exit: LeanWorkerExit },
340    /// A worker operation timed out.
341    Timeout {
342        operation: &'static str,
343        duration: Duration,
344    },
345    /// A parent-side cancellation token was observed.
346    Cancelled { operation: &'static str },
347    /// A parent-side progress sink panicked while handling a worker event.
348    ProgressPanic { message: String },
349    /// A parent-side data sink panicked while handling a worker row.
350    DataSinkPanic { message: String },
351    /// A parent-side diagnostic sink panicked while handling a worker diagnostic.
352    DiagnosticSinkPanic { message: String },
353    /// A streaming export returned a nonzero downstream status byte.
354    StreamExportFailed { status: u8 },
355    /// The in-child string callback helper returned a callback failure status.
356    StreamCallbackFailed { status: u8, description: String },
357    /// A streaming callback emitted a malformed row envelope.
358    StreamRowMalformed { message: String },
359    /// A capability metadata export returned malformed JSON.
360    CapabilityMetadataMalformed { message: String },
361    /// Capability metadata did not match the caller's requested expectation.
362    CapabilityMetadataMismatch {
363        export: String,
364        expected: Box<LeanWorkerCapabilityMetadata>,
365        actual: Box<LeanWorkerCapabilityMetadata>,
366    },
367    /// A capability doctor export returned malformed JSON.
368    CapabilityDoctorMalformed { message: String },
369    /// A typed command request could not be serialized as JSON.
370    TypedCommandRequestEncode { export: String, message: String },
371    /// A typed non-streaming command response could not be decoded.
372    TypedCommandResponseDecode { export: String, message: String },
373    /// A typed streaming command row payload could not be decoded.
374    TypedCommandRowDecode {
375        export: String,
376        stream: String,
377        sequence: u64,
378        message: String,
379    },
380    /// A typed streaming command terminal summary could not be decoded.
381    TypedCommandSummaryDecode { export: String, message: String },
382    /// A pool session lease was invalidated by a worker lifecycle transition.
383    LeaseInvalidated { reason: String },
384    /// A local worker pool cannot admit another distinct session key.
385    WorkerPoolExhausted { max_workers: usize },
386    /// A local worker pool cannot admit work without exceeding its RSS budget.
387    WorkerPoolMemoryBudgetExceeded { current_kib: u64, limit_kib: u64 },
388    /// Waiting for local worker-pool admission exceeded the configured limit.
389    WorkerPoolQueueTimeout { waited: Duration },
390    /// The public supervisor does not support the requested operation.
391    UnsupportedRequest { operation: &'static str },
392    /// Waiting for a child process failed.
393    Wait { source: std::io::Error },
394}
395
396impl fmt::Display for LeanWorkerError {
397    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398        match self {
399            Self::Spawn { executable, source } => {
400                write!(f, "failed to spawn worker {}: {source}", executable.display())
401            }
402            Self::WorkerChildUnresolved { tried } => {
403                let tried = tried
404                    .iter()
405                    .map(|path| path.display().to_string())
406                    .collect::<Vec<_>>()
407                    .join(", ");
408                write!(
409                    f,
410                    "could not resolve lean-rs-worker-child; set LEAN_RS_WORKER_CHILD or place it beside the current executable (tried: {tried})"
411                )
412            }
413            Self::WorkerChildNotExecutable { path, reason } => {
414                write!(f, "worker child '{}' is not executable: {reason}", path.display())
415            }
416            Self::Bootstrap { code, message } => {
417                write!(f, "worker bootstrap check {code} failed: {message}")
418            }
419            Self::CapabilityBuild { diagnostic } => {
420                write!(f, "worker capability Lake target build failed: {diagnostic}")
421            }
422            Self::Setup { message } => write!(f, "worker child setup failed: {message}"),
423            Self::Handshake { message } => write!(f, "worker handshake failed: {message}"),
424            Self::Protocol { message } => write!(f, "worker protocol failed: {message}"),
425            Self::Worker { code, message } => write!(f, "worker returned {code}: {message}"),
426            Self::ChildExited { exit } => write!(f, "worker exited with {}", exit.status),
427            Self::ChildPanicOrAbort { exit } => {
428                write!(f, "worker exited fatally with {}", exit.status)
429            }
430            Self::Timeout { operation, duration } => {
431                write!(f, "worker operation {operation} timed out after {duration:?}")
432            }
433            Self::Cancelled { operation } => write!(f, "worker operation {operation} was cancelled"),
434            Self::ProgressPanic { message } => write!(f, "worker progress sink panicked: {message}"),
435            Self::DataSinkPanic { message } => write!(f, "worker data sink panicked: {message}"),
436            Self::DiagnosticSinkPanic { message } => {
437                write!(f, "worker diagnostic sink panicked: {message}")
438            }
439            Self::StreamExportFailed { status } => write!(f, "streaming export returned status {status}"),
440            Self::StreamCallbackFailed { status, description } => {
441                write!(f, "streaming callback failed with status {status}: {description}")
442            }
443            Self::StreamRowMalformed { message } => write!(f, "streaming export emitted malformed row: {message}"),
444            Self::CapabilityMetadataMalformed { message } => {
445                write!(f, "capability metadata export returned malformed JSON: {message}")
446            }
447            Self::CapabilityMetadataMismatch { export, .. } => {
448                write!(f, "capability metadata from {export} did not match expectation")
449            }
450            Self::CapabilityDoctorMalformed { message } => {
451                write!(f, "capability doctor export returned malformed JSON: {message}")
452            }
453            Self::TypedCommandRequestEncode { export, message } => {
454                write!(f, "typed worker command {export} request JSON encode failed: {message}")
455            }
456            Self::TypedCommandResponseDecode { export, message } => {
457                write!(
458                    f,
459                    "typed worker command {export} response JSON decode failed: {message}"
460                )
461            }
462            Self::TypedCommandRowDecode {
463                export,
464                stream,
465                sequence,
466                message,
467            } => {
468                write!(
469                    f,
470                    "typed worker command {export} row decode failed at stream {stream} sequence {sequence}: {message}"
471                )
472            }
473            Self::TypedCommandSummaryDecode { export, message } => {
474                write!(
475                    f,
476                    "typed worker command {export} terminal summary decode failed: {message}"
477                )
478            }
479            Self::LeaseInvalidated { reason } => write!(f, "worker pool lease was invalidated: {reason}"),
480            Self::WorkerPoolExhausted { max_workers } => {
481                write!(
482                    f,
483                    "worker pool cannot admit another session key; max_workers={max_workers}"
484                )
485            }
486            Self::WorkerPoolMemoryBudgetExceeded { current_kib, limit_kib } => {
487                write!(
488                    f,
489                    "worker pool cannot admit work within RSS budget; current_kib={current_kib} limit_kib={limit_kib}"
490                )
491            }
492            Self::WorkerPoolQueueTimeout { waited } => {
493                write!(f, "worker pool admission timed out after {waited:?}")
494            }
495            Self::UnsupportedRequest { operation } => {
496                write!(f, "worker operation {operation} is not supported")
497            }
498            Self::Wait { source } => write!(f, "failed to wait for worker child: {source}"),
499        }
500    }
501}
502
503impl std::error::Error for LeanWorkerError {
504    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
505        match self {
506            Self::Spawn { source, .. } | Self::Wait { source } => Some(source),
507            Self::CapabilityBuild { diagnostic } => Some(diagnostic),
508            Self::WorkerChildUnresolved { .. } | Self::WorkerChildNotExecutable { .. } | Self::Bootstrap { .. } => None,
509            Self::Setup { .. }
510            | Self::Handshake { .. }
511            | Self::Protocol { .. }
512            | Self::Worker { .. }
513            | Self::ChildExited { .. }
514            | Self::ChildPanicOrAbort { .. }
515            | Self::Timeout { .. }
516            | Self::Cancelled { .. }
517            | Self::ProgressPanic { .. }
518            | Self::DataSinkPanic { .. }
519            | Self::DiagnosticSinkPanic { .. }
520            | Self::StreamExportFailed { .. }
521            | Self::StreamCallbackFailed { .. }
522            | Self::StreamRowMalformed { .. }
523            | Self::CapabilityMetadataMalformed { .. }
524            | Self::CapabilityMetadataMismatch { .. }
525            | Self::CapabilityDoctorMalformed { .. }
526            | Self::TypedCommandRequestEncode { .. }
527            | Self::TypedCommandResponseDecode { .. }
528            | Self::TypedCommandRowDecode { .. }
529            | Self::TypedCommandSummaryDecode { .. }
530            | Self::LeaseInvalidated { .. }
531            | Self::WorkerPoolExhausted { .. }
532            | Self::WorkerPoolMemoryBudgetExceeded { .. }
533            | Self::WorkerPoolQueueTimeout { .. }
534            | Self::UnsupportedRequest { .. } => None,
535        }
536    }
537}
538
539/// Supervisor for one `lean-rs-worker` child process.
540///
541/// Dropping a live supervisor attempts to terminate the child and then waits
542/// for it. Drop never panics; explicit `terminate` is preferred when callers
543/// need the exit status.
544#[derive(Debug)]
545pub struct LeanWorker {
546    config: LeanWorkerConfig,
547    child: Option<Child>,
548    stdin: Option<BufWriter<ChildStdin>>,
549    stdout: Option<BufReader<ChildStdout>>,
550    stderr: Option<ChildStderr>,
551    last_exit: Option<LeanWorkerExit>,
552    runtime_metadata: LeanWorkerRuntimeMetadata,
553    stats: LeanWorkerStats,
554    requests_since_restart: u64,
555    imports_since_restart: u64,
556    last_activity: Instant,
557}
558
559impl LeanWorker {
560    /// Spawn a worker child and wait for its protocol handshake.
561    ///
562    /// # Errors
563    ///
564    /// Returns `LeanWorkerError` if the child cannot be spawned, child setup
565    /// fails, the child exits before handshaking, or the startup timeout
566    /// expires.
567    pub fn spawn(config: &LeanWorkerConfig) -> Result<Self, LeanWorkerError> {
568        let mut command = Command::new(&config.executable);
569        command
570            .stdin(Stdio::piped())
571            .stdout(Stdio::piped())
572            .stderr(Stdio::piped())
573            .env("LEAN_ABORT_ON_PANIC", "1")
574            .env("RUST_BACKTRACE", "0");
575
576        if let Some(current_dir) = &config.current_dir {
577            command.current_dir(current_dir);
578        }
579        for (key, value) in &config.env {
580            command.env(key, value);
581        }
582
583        let mut child = command.spawn().map_err(|source| LeanWorkerError::Spawn {
584            executable: config.executable.clone(),
585            source,
586        })?;
587
588        let stdin = child
589            .stdin
590            .take()
591            .map(BufWriter::new)
592            .ok_or_else(|| LeanWorkerError::Setup {
593                message: "child stdin unavailable".to_owned(),
594            })?;
595        let stdout = child.stdout.take().ok_or_else(|| LeanWorkerError::Setup {
596            message: "child stdout unavailable".to_owned(),
597        })?;
598        let stderr = child.stderr.take();
599
600        let (sender, receiver) = mpsc::channel();
601        let _handshake_reader = thread::spawn(move || {
602            let mut stdout = BufReader::new(stdout);
603            let result = expect_handshake(&mut stdout);
604            drop(sender.send((stdout, result)));
605        });
606
607        let (stdout, runtime_metadata) = match receiver.recv_timeout(config.startup_timeout) {
608            Ok((stdout, Ok(metadata))) => (stdout, metadata),
609            Ok((_stdout, Err(err))) => {
610                let mut worker = Self {
611                    config: config.clone(),
612                    child: Some(child),
613                    stdin: Some(stdin),
614                    stdout: None,
615                    stderr,
616                    last_exit: None,
617                    runtime_metadata: LeanWorkerRuntimeMetadata {
618                        worker_version: String::new(),
619                        protocol_version: crate::protocol::PROTOCOL_VERSION,
620                        lean_version: None,
621                    },
622                    stats: LeanWorkerStats::default(),
623                    requests_since_restart: 0,
624                    imports_since_restart: 0,
625                    last_activity: Instant::now(),
626                };
627                let exit = worker.try_record_exit();
628                return Err(match exit {
629                    Some(exit) if !exit.success => LeanWorkerError::ChildPanicOrAbort { exit },
630                    Some(exit) => LeanWorkerError::ChildExited { exit },
631                    None => err,
632                });
633            }
634            Err(mpsc::RecvTimeoutError::Timeout) => {
635                drop(child.kill());
636                let _exit = wait_with_stderr(&mut child, stderr)?;
637                return Err(LeanWorkerError::Timeout {
638                    operation: "startup",
639                    duration: config.startup_timeout,
640                });
641            }
642            Err(mpsc::RecvTimeoutError::Disconnected) => {
643                return Err(LeanWorkerError::Handshake {
644                    message: "handshake reader exited without a result".to_owned(),
645                });
646            }
647        };
648
649        Ok(Self {
650            config: config.clone(),
651            child: Some(child),
652            stdin: Some(stdin),
653            stdout: Some(stdout),
654            stderr,
655            last_exit: None,
656            runtime_metadata,
657            stats: LeanWorkerStats::default(),
658            requests_since_restart: 0,
659            imports_since_restart: 0,
660            last_activity: Instant::now(),
661        })
662    }
663
664    /// Check whether the worker responds to requests.
665    ///
666    /// # Errors
667    ///
668    /// Returns `LeanWorkerError` if the worker is dead, the protocol fails, or
669    /// the child returns a typed worker error.
670    pub fn health(&mut self) -> Result<(), LeanWorkerError> {
671        self.prepare_request(false)?;
672        self.send_request(Request::Health)?;
673        self.record_request(false);
674        match self.read_response("health")? {
675            Response::HealthOk => Ok(()),
676            other @ (Response::CapabilityLoaded
677            | Response::U64 { .. }
678            | Response::HostSessionOpened
679            | Response::Elaboration { .. }
680            | Response::KernelCheck { .. }
681            | Response::Strings { .. }
682            | Response::StreamComplete { .. }
683            | Response::StreamExportFailed { .. }
684            | Response::StreamCallbackFailed { .. }
685            | Response::StreamRowMalformed { .. }
686            | Response::CapabilityMetadata { .. }
687            | Response::CapabilityDoctor { .. }
688            | Response::CapabilityMetadataMalformed { .. }
689            | Response::CapabilityDoctorMalformed { .. }
690            | Response::JsonCommand { .. }
691            | Response::RowsComplete { .. }
692            | Response::Terminating
693            | Response::Error { .. }) => Err(unexpected_response("health", &other)),
694        }
695    }
696
697    /// Load the in-tree fixture capability in the worker child.
698    ///
699    /// This is a fixture-only entry point used to exercise the supervisor path
700    /// in tests. The supported public path is `open_session`, which returns the
701    /// host-session adapter instead of expanding this fixture surface.
702    ///
703    /// # Errors
704    ///
705    /// Returns `LeanWorkerError` if the worker is dead, fixture loading fails,
706    /// or protocol communication fails.
707    pub fn load_fixture_capability(&mut self, fixture_root: impl AsRef<Path>) -> Result<(), LeanWorkerError> {
708        self.prepare_request(true)?;
709        self.send_request(Request::LoadFixtureCapability {
710            fixture_root: path_string(fixture_root.as_ref()),
711        })?;
712        self.record_request(true);
713        match self.read_response("load_fixture_capability")? {
714            Response::CapabilityLoaded => Ok(()),
715            other @ (Response::HealthOk
716            | Response::U64 { .. }
717            | Response::HostSessionOpened
718            | Response::Elaboration { .. }
719            | Response::KernelCheck { .. }
720            | Response::Strings { .. }
721            | Response::StreamComplete { .. }
722            | Response::StreamExportFailed { .. }
723            | Response::StreamCallbackFailed { .. }
724            | Response::StreamRowMalformed { .. }
725            | Response::CapabilityMetadata { .. }
726            | Response::CapabilityDoctor { .. }
727            | Response::CapabilityMetadataMalformed { .. }
728            | Response::CapabilityDoctorMalformed { .. }
729            | Response::JsonCommand { .. }
730            | Response::RowsComplete { .. }
731            | Response::Terminating
732            | Response::Error { .. }) => Err(unexpected_response("load_fixture_capability", &other)),
733        }
734    }
735
736    /// Call the prompt fixture multiplication export in the worker child.
737    ///
738    /// # Errors
739    ///
740    /// Returns `LeanWorkerError` if the worker is dead, the export fails, or
741    /// protocol communication fails.
742    pub fn call_fixture_mul(
743        &mut self,
744        fixture_root: impl AsRef<Path>,
745        lhs: u64,
746        rhs: u64,
747    ) -> Result<u64, LeanWorkerError> {
748        self.prepare_request(true)?;
749        self.send_request(Request::CallFixtureMul {
750            fixture_root: path_string(fixture_root.as_ref()),
751            lhs,
752            rhs,
753        })?;
754        self.record_request(true);
755        match self.read_response("call_fixture_mul")? {
756            Response::U64 { value } => Ok(value),
757            other @ (Response::HealthOk
758            | Response::CapabilityLoaded
759            | Response::HostSessionOpened
760            | Response::Elaboration { .. }
761            | Response::KernelCheck { .. }
762            | Response::Strings { .. }
763            | Response::StreamComplete { .. }
764            | Response::StreamExportFailed { .. }
765            | Response::StreamCallbackFailed { .. }
766            | Response::StreamRowMalformed { .. }
767            | Response::CapabilityMetadata { .. }
768            | Response::CapabilityDoctor { .. }
769            | Response::CapabilityMetadataMalformed { .. }
770            | Response::CapabilityDoctorMalformed { .. }
771            | Response::JsonCommand { .. }
772            | Response::RowsComplete { .. }
773            | Response::Terminating
774            | Response::Error { .. }) => Err(unexpected_response("call_fixture_mul", &other)),
775        }
776    }
777
778    /// Return the current worker lifecycle status.
779    ///
780    /// # Errors
781    ///
782    /// Returns `LeanWorkerError` if checking the process status fails.
783    pub fn status(&mut self) -> Result<LeanWorkerStatus, LeanWorkerError> {
784        if let Some(exit) = &self.last_exit {
785            return Ok(LeanWorkerStatus::Exited(exit.clone()));
786        }
787        let Some(child) = self.child.as_mut() else {
788            return Ok(LeanWorkerStatus::Exited(LeanWorkerExit {
789                success: false,
790                code: None,
791                status: "worker is not running".to_owned(),
792                diagnostics: String::new(),
793            }));
794        };
795        match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
796            Some(status) => {
797                let diagnostics = self.read_stderr();
798                let exit = LeanWorkerExit::from_status(status, diagnostics);
799                self.last_exit = Some(exit.clone());
800                self.child = None;
801                self.stdin = None;
802                self.stdout = None;
803                self.stats.exits = self.stats.exits.saturating_add(1);
804                Ok(LeanWorkerStatus::Exited(exit))
805            }
806            None => Ok(LeanWorkerStatus::Running),
807        }
808    }
809
810    /// Return lifecycle counters for this supervisor.
811    #[must_use]
812    pub fn stats(&self) -> LeanWorkerStats {
813        self.stats.clone()
814    }
815
816    /// Return protocol/runtime facts reported by the worker child.
817    #[must_use]
818    pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
819        self.runtime_metadata.clone()
820    }
821
822    /// Measure the current child RSS in KiB when supported by the platform.
823    ///
824    /// This is an observability hook for restart policy and memory-cycling
825    /// workloads. A `None` result means the platform did not provide a usable
826    /// sample; it is not a worker failure.
827    pub fn rss_kib(&mut self) -> Option<u64> {
828        match self.child_rss_kib() {
829            Some(value) => {
830                self.stats.last_rss_kib = Some(value);
831                Some(value)
832            }
833            None => {
834                self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
835                None
836            }
837        }
838    }
839
840    /// Return the timeout used for subsequent worker requests.
841    #[must_use]
842    pub fn request_timeout(&self) -> Duration {
843        self.config.request_timeout
844    }
845
846    /// Change the timeout for subsequent worker requests.
847    ///
848    /// This changes supervisor policy only. The supervisor still owns the
849    /// deadline, child kill, replacement, and restart accounting.
850    pub fn set_request_timeout(&mut self, timeout: Duration) {
851        self.config.request_timeout = timeout;
852    }
853
854    /// Explicitly cycle the worker process.
855    ///
856    /// This is the manual memory-reset operation. It terminates the current
857    /// child, starts a replacement with the original configuration, and records
858    /// `LeanWorkerRestartReason::Explicit`.
859    ///
860    /// # Errors
861    ///
862    /// Returns `LeanWorkerError` if the existing child cannot be waited on or
863    /// the replacement child cannot be spawned and handshaken.
864    pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
865        self.restart_with_reason(LeanWorkerRestartReason::Explicit)
866    }
867
868    pub(crate) fn cycle_with_restart_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
869        self.restart_with_reason(reason)
870    }
871
872    /// Restart this worker using its original configuration.
873    ///
874    /// This is an explicit lifecycle operation. Prompt 58 adds policy-driven
875    /// restarts for memory cycling; this method only gives callers a direct
876    /// reset point.
877    ///
878    /// # Errors
879    ///
880    /// Returns `LeanWorkerError` if the existing child cannot be waited on or
881    /// the replacement child cannot be spawned and handshaken.
882    pub fn restart(&mut self) -> Result<(), LeanWorkerError> {
883        self.cycle()
884    }
885
886    #[doc(hidden)]
887    /// Kill the child process for supervisor tests.
888    ///
889    /// # Errors
890    ///
891    /// Returns `LeanWorkerError` if the worker is already dead or the OS kill
892    /// request fails.
893    pub fn __kill_for_test(&mut self) -> Result<(), LeanWorkerError> {
894        let Some(child) = self.child.as_mut() else {
895            return Err(self.dead_error());
896        };
897        child.kill().map_err(|source| LeanWorkerError::Wait { source })?;
898        Ok(())
899    }
900
901    /// Ask the child to terminate cleanly and wait for it.
902    ///
903    /// # Errors
904    ///
905    /// Returns `LeanWorkerError` if the worker is already dead, the protocol
906    /// fails, or waiting for the child process fails.
907    pub fn terminate(mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
908        self.send_request(Request::Terminate)?;
909        match self.read_response("terminate")? {
910            Response::Terminating => self.wait_for_exit(),
911            other @ (Response::HealthOk
912            | Response::CapabilityLoaded
913            | Response::U64 { .. }
914            | Response::HostSessionOpened
915            | Response::Elaboration { .. }
916            | Response::KernelCheck { .. }
917            | Response::Strings { .. }
918            | Response::StreamComplete { .. }
919            | Response::StreamExportFailed { .. }
920            | Response::StreamCallbackFailed { .. }
921            | Response::StreamRowMalformed { .. }
922            | Response::CapabilityMetadata { .. }
923            | Response::CapabilityDoctor { .. }
924            | Response::CapabilityMetadataMalformed { .. }
925            | Response::CapabilityDoctorMalformed { .. }
926            | Response::JsonCommand { .. }
927            | Response::RowsComplete { .. }
928            | Response::Error { .. }) => Err(unexpected_response("terminate", &other)),
929        }
930    }
931
932    #[doc(hidden)]
933    /// Trigger the prompt fixture panic path.
934    ///
935    /// # Errors
936    ///
937    /// Returns `LeanWorkerError` if the worker does not exit fatally or if the
938    /// protocol fails before the panic path runs.
939    pub fn __trigger_lean_panic_fixture(
940        mut self,
941        fixture_root: impl AsRef<Path>,
942    ) -> Result<LeanWorkerExit, LeanWorkerError> {
943        self.prepare_request(true)?;
944        self.send_request(Request::TriggerLeanPanic {
945            fixture_root: path_string(fixture_root.as_ref()),
946        })?;
947        self.record_request(true);
948        match self.read_response("trigger_lean_panic") {
949            Ok(response) => Err(unexpected_response("trigger_lean_panic", &response)),
950            Err(LeanWorkerError::ChildPanicOrAbort { exit }) => Ok(exit),
951            Err(err) => Err(err),
952        }
953    }
954
955    #[doc(hidden)]
956    /// Emit synthetic worker data rows through the private protocol for row sink tests.
957    ///
958    /// # Errors
959    ///
960    /// Returns `LeanWorkerError` if the worker is dead, the sink panics,
961    /// cancellation is observed, or protocol communication fails.
962    pub fn __emit_test_rows(
963        &mut self,
964        streams: Vec<String>,
965        cancellation: Option<&LeanWorkerCancellationToken>,
966        data: Option<&dyn LeanWorkerDataSink>,
967    ) -> Result<u64, LeanWorkerError> {
968        const OPERATION: &str = "emit_test_rows";
969        check_cancelled(OPERATION, cancellation)?;
970        self.prepare_request(false)?;
971        self.send_request(Request::EmitTestRows { streams })?;
972        self.record_request(false);
973        match self.read_response_with_events(
974            OPERATION,
975            None,
976            cancellation,
977            data.map(LeanWorkerDataSinkTarget::Value),
978            None,
979        )? {
980            Response::RowsComplete { count } => Ok(count),
981            other @ (Response::HealthOk
982            | Response::CapabilityLoaded
983            | Response::U64 { .. }
984            | Response::HostSessionOpened
985            | Response::Elaboration { .. }
986            | Response::KernelCheck { .. }
987            | Response::Strings { .. }
988            | Response::StreamComplete { .. }
989            | Response::StreamExportFailed { .. }
990            | Response::StreamCallbackFailed { .. }
991            | Response::StreamRowMalformed { .. }
992            | Response::CapabilityMetadata { .. }
993            | Response::CapabilityDoctor { .. }
994            | Response::CapabilityMetadataMalformed { .. }
995            | Response::CapabilityDoctorMalformed { .. }
996            | Response::JsonCommand { .. }
997            | Response::Terminating
998            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
999        }
1000    }
1001
1002    pub(crate) fn open_worker_session(
1003        &mut self,
1004        config: &LeanWorkerSessionConfig,
1005        cancellation: Option<&LeanWorkerCancellationToken>,
1006        progress: Option<&dyn LeanWorkerProgressSink>,
1007    ) -> Result<(), LeanWorkerError> {
1008        const OPERATION: &str = "open_worker_session";
1009        check_cancelled(OPERATION, cancellation)?;
1010        self.prepare_request(true)?;
1011        self.send_request(Request::OpenHostSession {
1012            project_root: config.project_root_string(),
1013            package: config.package().to_owned(),
1014            lib_name: config.lib_name().to_owned(),
1015            imports: config.imports().to_vec(),
1016        })?;
1017        self.record_request(true);
1018        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1019            Response::HostSessionOpened => Ok(()),
1020            other @ (Response::HealthOk
1021            | Response::CapabilityLoaded
1022            | Response::U64 { .. }
1023            | Response::Elaboration { .. }
1024            | Response::KernelCheck { .. }
1025            | Response::Strings { .. }
1026            | Response::StreamComplete { .. }
1027            | Response::StreamExportFailed { .. }
1028            | Response::StreamCallbackFailed { .. }
1029            | Response::StreamRowMalformed { .. }
1030            | Response::CapabilityMetadata { .. }
1031            | Response::CapabilityDoctor { .. }
1032            | Response::CapabilityMetadataMalformed { .. }
1033            | Response::CapabilityDoctorMalformed { .. }
1034            | Response::JsonCommand { .. }
1035            | Response::RowsComplete { .. }
1036            | Response::Terminating
1037            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1038        }
1039    }
1040
1041    pub(crate) fn worker_elaborate(
1042        &mut self,
1043        source: &str,
1044        options: &LeanWorkerElabOptions,
1045        cancellation: Option<&LeanWorkerCancellationToken>,
1046        progress: Option<&dyn LeanWorkerProgressSink>,
1047    ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
1048        const OPERATION: &str = "worker_elaborate";
1049        check_cancelled(OPERATION, cancellation)?;
1050        self.prepare_request(false)?;
1051        self.send_request(Request::Elaborate {
1052            source: source.to_owned(),
1053            options: options.wire(),
1054        })?;
1055        self.record_request(false);
1056        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1057            Response::Elaboration { outcome } => Ok(outcome.into()),
1058            other @ (Response::HealthOk
1059            | Response::CapabilityLoaded
1060            | Response::U64 { .. }
1061            | Response::HostSessionOpened
1062            | Response::KernelCheck { .. }
1063            | Response::Strings { .. }
1064            | Response::StreamComplete { .. }
1065            | Response::StreamExportFailed { .. }
1066            | Response::StreamCallbackFailed { .. }
1067            | Response::StreamRowMalformed { .. }
1068            | Response::CapabilityMetadata { .. }
1069            | Response::CapabilityDoctor { .. }
1070            | Response::CapabilityMetadataMalformed { .. }
1071            | Response::CapabilityDoctorMalformed { .. }
1072            | Response::JsonCommand { .. }
1073            | Response::RowsComplete { .. }
1074            | Response::Terminating
1075            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1076        }
1077    }
1078
1079    pub(crate) fn worker_kernel_check(
1080        &mut self,
1081        source: &str,
1082        options: &LeanWorkerElabOptions,
1083        cancellation: Option<&LeanWorkerCancellationToken>,
1084        progress: Option<&dyn LeanWorkerProgressSink>,
1085    ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
1086        const OPERATION: &str = "worker_kernel_check";
1087        check_cancelled(OPERATION, cancellation)?;
1088        self.prepare_request(false)?;
1089        self.send_request(Request::KernelCheck {
1090            source: source.to_owned(),
1091            options: options.wire(),
1092            progress: progress.is_some(),
1093        })?;
1094        self.record_request(false);
1095        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1096            Response::KernelCheck { outcome } => Ok(outcome.into()),
1097            other @ (Response::HealthOk
1098            | Response::CapabilityLoaded
1099            | Response::U64 { .. }
1100            | Response::HostSessionOpened
1101            | Response::Elaboration { .. }
1102            | Response::Strings { .. }
1103            | Response::StreamComplete { .. }
1104            | Response::StreamExportFailed { .. }
1105            | Response::StreamCallbackFailed { .. }
1106            | Response::StreamRowMalformed { .. }
1107            | Response::CapabilityMetadata { .. }
1108            | Response::CapabilityDoctor { .. }
1109            | Response::CapabilityMetadataMalformed { .. }
1110            | Response::CapabilityDoctorMalformed { .. }
1111            | Response::JsonCommand { .. }
1112            | Response::RowsComplete { .. }
1113            | Response::Terminating
1114            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1115        }
1116    }
1117
1118    pub(crate) fn worker_declaration_kinds(
1119        &mut self,
1120        names: &[&str],
1121        cancellation: Option<&LeanWorkerCancellationToken>,
1122        progress: Option<&dyn LeanWorkerProgressSink>,
1123    ) -> Result<Vec<String>, LeanWorkerError> {
1124        const OPERATION: &str = "worker_declaration_kinds";
1125        check_cancelled(OPERATION, cancellation)?;
1126        self.prepare_request(false)?;
1127        self.send_request(Request::DeclarationKinds {
1128            names: names.iter().map(|name| (*name).to_owned()).collect(),
1129            progress: progress.is_some(),
1130        })?;
1131        self.record_request(false);
1132        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1133            Response::Strings { values } => Ok(values),
1134            other @ (Response::HealthOk
1135            | Response::CapabilityLoaded
1136            | Response::U64 { .. }
1137            | Response::HostSessionOpened
1138            | Response::Elaboration { .. }
1139            | Response::KernelCheck { .. }
1140            | Response::StreamComplete { .. }
1141            | Response::StreamExportFailed { .. }
1142            | Response::StreamCallbackFailed { .. }
1143            | Response::StreamRowMalformed { .. }
1144            | Response::CapabilityMetadata { .. }
1145            | Response::CapabilityDoctor { .. }
1146            | Response::CapabilityMetadataMalformed { .. }
1147            | Response::CapabilityDoctorMalformed { .. }
1148            | Response::JsonCommand { .. }
1149            | Response::RowsComplete { .. }
1150            | Response::Terminating
1151            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1152        }
1153    }
1154
1155    pub(crate) fn worker_declaration_names(
1156        &mut self,
1157        names: &[&str],
1158        cancellation: Option<&LeanWorkerCancellationToken>,
1159        progress: Option<&dyn LeanWorkerProgressSink>,
1160    ) -> Result<Vec<String>, LeanWorkerError> {
1161        const OPERATION: &str = "worker_declaration_names";
1162        check_cancelled(OPERATION, cancellation)?;
1163        self.prepare_request(false)?;
1164        self.send_request(Request::DeclarationNames {
1165            names: names.iter().map(|name| (*name).to_owned()).collect(),
1166            progress: progress.is_some(),
1167        })?;
1168        self.record_request(false);
1169        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1170            Response::Strings { values } => Ok(values),
1171            other @ (Response::HealthOk
1172            | Response::CapabilityLoaded
1173            | Response::U64 { .. }
1174            | Response::HostSessionOpened
1175            | Response::Elaboration { .. }
1176            | Response::KernelCheck { .. }
1177            | Response::StreamComplete { .. }
1178            | Response::StreamExportFailed { .. }
1179            | Response::StreamCallbackFailed { .. }
1180            | Response::StreamRowMalformed { .. }
1181            | Response::CapabilityMetadata { .. }
1182            | Response::CapabilityDoctor { .. }
1183            | Response::CapabilityMetadataMalformed { .. }
1184            | Response::CapabilityDoctorMalformed { .. }
1185            | Response::JsonCommand { .. }
1186            | Response::RowsComplete { .. }
1187            | Response::Terminating
1188            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1189        }
1190    }
1191
1192    pub(crate) fn worker_run_data_stream(
1193        &mut self,
1194        export: &str,
1195        request: &serde_json::Value,
1196        rows: &dyn LeanWorkerDataSink,
1197        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1198        cancellation: Option<&LeanWorkerCancellationToken>,
1199        progress: Option<&dyn LeanWorkerProgressSink>,
1200    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1201        self.worker_run_data_stream_with_sink(
1202            export,
1203            request,
1204            LeanWorkerDataSinkTarget::Value(rows),
1205            diagnostics,
1206            cancellation,
1207            progress,
1208        )
1209    }
1210
1211    pub(crate) fn worker_run_data_stream_raw(
1212        &mut self,
1213        export: &str,
1214        request: &serde_json::Value,
1215        rows: &dyn LeanWorkerRawDataSink,
1216        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1217        cancellation: Option<&LeanWorkerCancellationToken>,
1218        progress: Option<&dyn LeanWorkerProgressSink>,
1219    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1220        self.worker_run_data_stream_with_sink(
1221            export,
1222            request,
1223            LeanWorkerDataSinkTarget::Raw(rows),
1224            diagnostics,
1225            cancellation,
1226            progress,
1227        )
1228    }
1229
1230    fn worker_run_data_stream_with_sink(
1231        &mut self,
1232        export: &str,
1233        request: &serde_json::Value,
1234        rows: LeanWorkerDataSinkTarget<'_>,
1235        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1236        cancellation: Option<&LeanWorkerCancellationToken>,
1237        progress: Option<&dyn LeanWorkerProgressSink>,
1238    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1239        const OPERATION: &str = "worker_run_data_stream";
1240        check_cancelled(OPERATION, cancellation)?;
1241        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1242            message: format!("worker data-stream request JSON encode failed: {err}"),
1243        })?;
1244        self.prepare_request(false)?;
1245        self.send_request(Request::RunDataStream {
1246            export: export.to_owned(),
1247            request_json,
1248            progress: progress.is_some(),
1249        })?;
1250        self.record_request(false);
1251        self.stats.stream_requests = self.stats.stream_requests.saturating_add(1);
1252        match self.read_response_with_events(OPERATION, progress, cancellation, Some(rows), diagnostics)? {
1253            Response::StreamComplete { summary } => Ok(summary.into()),
1254            Response::StreamExportFailed { status_byte } => {
1255                Err(LeanWorkerError::StreamExportFailed { status: status_byte })
1256            }
1257            Response::StreamCallbackFailed {
1258                status_byte,
1259                description,
1260            } => Err(LeanWorkerError::StreamCallbackFailed {
1261                status: status_byte,
1262                description,
1263            }),
1264            Response::StreamRowMalformed { message } => Err(LeanWorkerError::StreamRowMalformed { message }),
1265            other @ (Response::HealthOk
1266            | Response::CapabilityLoaded
1267            | Response::U64 { .. }
1268            | Response::HostSessionOpened
1269            | Response::Elaboration { .. }
1270            | Response::KernelCheck { .. }
1271            | Response::Strings { .. }
1272            | Response::RowsComplete { .. }
1273            | Response::CapabilityMetadata { .. }
1274            | Response::CapabilityDoctor { .. }
1275            | Response::CapabilityMetadataMalformed { .. }
1276            | Response::CapabilityDoctorMalformed { .. }
1277            | Response::JsonCommand { .. }
1278            | Response::Terminating
1279            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1280        }
1281    }
1282
1283    pub(crate) fn worker_capability_metadata(
1284        &mut self,
1285        export: &str,
1286        request: &serde_json::Value,
1287        cancellation: Option<&LeanWorkerCancellationToken>,
1288        progress: Option<&dyn LeanWorkerProgressSink>,
1289    ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
1290        const OPERATION: &str = "worker_capability_metadata";
1291        check_cancelled(OPERATION, cancellation)?;
1292        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1293            message: format!("worker capability metadata request JSON encode failed: {err}"),
1294        })?;
1295        self.prepare_request(false)?;
1296        self.send_request(Request::CapabilityMetadata {
1297            export: export.to_owned(),
1298            request_json,
1299        })?;
1300        self.record_request(false);
1301        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1302            Response::CapabilityMetadata { metadata } => Ok(metadata.into()),
1303            Response::CapabilityMetadataMalformed { message } => {
1304                Err(LeanWorkerError::CapabilityMetadataMalformed { message })
1305            }
1306            other @ (Response::HealthOk
1307            | Response::CapabilityLoaded
1308            | Response::U64 { .. }
1309            | Response::HostSessionOpened
1310            | Response::Elaboration { .. }
1311            | Response::KernelCheck { .. }
1312            | Response::Strings { .. }
1313            | Response::StreamComplete { .. }
1314            | Response::StreamExportFailed { .. }
1315            | Response::StreamCallbackFailed { .. }
1316            | Response::StreamRowMalformed { .. }
1317            | Response::CapabilityDoctor { .. }
1318            | Response::CapabilityDoctorMalformed { .. }
1319            | Response::JsonCommand { .. }
1320            | Response::RowsComplete { .. }
1321            | Response::Terminating
1322            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1323        }
1324    }
1325
1326    pub(crate) fn worker_capability_doctor(
1327        &mut self,
1328        export: &str,
1329        request: &serde_json::Value,
1330        cancellation: Option<&LeanWorkerCancellationToken>,
1331        progress: Option<&dyn LeanWorkerProgressSink>,
1332    ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
1333        const OPERATION: &str = "worker_capability_doctor";
1334        check_cancelled(OPERATION, cancellation)?;
1335        let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1336            message: format!("worker capability doctor request JSON encode failed: {err}"),
1337        })?;
1338        self.prepare_request(false)?;
1339        self.send_request(Request::CapabilityDoctor {
1340            export: export.to_owned(),
1341            request_json,
1342        })?;
1343        self.record_request(false);
1344        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1345            Response::CapabilityDoctor { report } => Ok(report.into()),
1346            Response::CapabilityDoctorMalformed { message } => {
1347                Err(LeanWorkerError::CapabilityDoctorMalformed { message })
1348            }
1349            other @ (Response::HealthOk
1350            | Response::CapabilityLoaded
1351            | Response::U64 { .. }
1352            | Response::HostSessionOpened
1353            | Response::Elaboration { .. }
1354            | Response::KernelCheck { .. }
1355            | Response::Strings { .. }
1356            | Response::StreamComplete { .. }
1357            | Response::StreamExportFailed { .. }
1358            | Response::StreamCallbackFailed { .. }
1359            | Response::StreamRowMalformed { .. }
1360            | Response::CapabilityMetadata { .. }
1361            | Response::CapabilityMetadataMalformed { .. }
1362            | Response::JsonCommand { .. }
1363            | Response::RowsComplete { .. }
1364            | Response::Terminating
1365            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1366        }
1367    }
1368
1369    pub(crate) fn worker_json_command(
1370        &mut self,
1371        export: &str,
1372        request_json: String,
1373        cancellation: Option<&LeanWorkerCancellationToken>,
1374        progress: Option<&dyn LeanWorkerProgressSink>,
1375    ) -> Result<String, LeanWorkerError> {
1376        const OPERATION: &str = "worker_json_command";
1377        check_cancelled(OPERATION, cancellation)?;
1378        self.prepare_request(false)?;
1379        self.send_request(Request::JsonCommand {
1380            export: export.to_owned(),
1381            request_json,
1382        })?;
1383        self.record_request(false);
1384        match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1385            Response::JsonCommand { response_json } => Ok(response_json),
1386            other @ (Response::HealthOk
1387            | Response::CapabilityLoaded
1388            | Response::U64 { .. }
1389            | Response::HostSessionOpened
1390            | Response::Elaboration { .. }
1391            | Response::KernelCheck { .. }
1392            | Response::Strings { .. }
1393            | Response::StreamComplete { .. }
1394            | Response::StreamExportFailed { .. }
1395            | Response::StreamCallbackFailed { .. }
1396            | Response::StreamRowMalformed { .. }
1397            | Response::CapabilityMetadata { .. }
1398            | Response::CapabilityDoctor { .. }
1399            | Response::CapabilityMetadataMalformed { .. }
1400            | Response::CapabilityDoctorMalformed { .. }
1401            | Response::RowsComplete { .. }
1402            | Response::Terminating
1403            | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1404        }
1405    }
1406
1407    fn send_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
1408        self.ensure_running()?;
1409        let Some(stdin) = self.stdin.as_mut() else {
1410            return Err(self.dead_error());
1411        };
1412        write_frame(stdin, Message::Request(request)).map_err(|err| LeanWorkerError::Protocol {
1413            message: err.to_string(),
1414        })
1415    }
1416
1417    fn prepare_request(&mut self, import_like: bool) -> Result<(), LeanWorkerError> {
1418        self.ensure_running()?;
1419
1420        if let Some(limit) = self.config.restart_policy.max_requests
1421            && self.requests_since_restart >= limit
1422        {
1423            return self.restart_with_reason(LeanWorkerRestartReason::MaxRequests { limit });
1424        }
1425
1426        if import_like
1427            && let Some(limit) = self.config.restart_policy.max_imports
1428            && self.imports_since_restart >= limit
1429        {
1430            return self.restart_with_reason(LeanWorkerRestartReason::MaxImports { limit });
1431        }
1432
1433        if let Some(limit_kib) = self.config.restart_policy.max_rss_kib {
1434            match self.child_rss_kib() {
1435                Some(current_kib) if current_kib >= limit_kib => {
1436                    self.stats.last_rss_kib = Some(current_kib);
1437                    return self.restart_with_reason(LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib });
1438                }
1439                Some(current_kib) => {
1440                    self.stats.last_rss_kib = Some(current_kib);
1441                }
1442                None => {
1443                    self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
1444                }
1445            }
1446        }
1447
1448        if let Some(limit) = self.config.restart_policy.idle_restart_after {
1449            let idle_for = self.last_activity.elapsed();
1450            if idle_for >= limit {
1451                return self.restart_with_reason(LeanWorkerRestartReason::Idle { idle_for, limit });
1452            }
1453        }
1454
1455        Ok(())
1456    }
1457
1458    fn record_request(&mut self, import_like: bool) {
1459        self.stats.requests = self.stats.requests.saturating_add(1);
1460        self.requests_since_restart = self.requests_since_restart.saturating_add(1);
1461        if import_like {
1462            self.stats.imports = self.stats.imports.saturating_add(1);
1463            self.imports_since_restart = self.imports_since_restart.saturating_add(1);
1464        }
1465        self.last_activity = Instant::now();
1466    }
1467
1468    fn restart_with_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
1469        let config = self.config.clone();
1470        self.stop_existing_child()?;
1471        self.stats.record_restart(reason);
1472        self.requests_since_restart = 0;
1473        self.imports_since_restart = 0;
1474        let mut next = Self::spawn(&config)?;
1475        next.stats = self.stats.clone();
1476        next.last_activity = Instant::now();
1477        *self = next;
1478        Ok(())
1479    }
1480
1481    fn read_response(&mut self, operation: &'static str) -> Result<Response, LeanWorkerError> {
1482        self.read_response_with_events(operation, None, None, None, None)
1483    }
1484
1485    fn read_response_with_progress(
1486        &mut self,
1487        operation: &'static str,
1488        progress: Option<&dyn LeanWorkerProgressSink>,
1489        cancellation: Option<&LeanWorkerCancellationToken>,
1490    ) -> Result<Response, LeanWorkerError> {
1491        self.read_response_with_events(operation, progress, cancellation, None, None)
1492    }
1493
1494    fn read_response_with_events(
1495        &mut self,
1496        operation: &'static str,
1497        progress: Option<&dyn LeanWorkerProgressSink>,
1498        cancellation: Option<&LeanWorkerCancellationToken>,
1499        data: Option<LeanWorkerDataSinkTarget<'_>>,
1500        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1501    ) -> Result<Response, LeanWorkerError> {
1502        let started = Instant::now();
1503        let timeout = self.config.request_timeout;
1504        let deadline = started.checked_add(timeout);
1505        let streaming = data.is_some();
1506        let mut request_backpressure_waits = 0_u64;
1507        let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
1508        let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
1509        let _reader = thread::spawn(move || read_request_messages(stdout, sender));
1510
1511        loop {
1512            let event = match deadline.and_then(|deadline| deadline.checked_duration_since(Instant::now())) {
1513                Some(remaining) if remaining.is_zero() => {
1514                    if streaming {
1515                        self.record_stream_failure(started, request_backpressure_waits);
1516                    }
1517                    self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1518                        operation,
1519                        duration: timeout,
1520                    })?;
1521                    return Err(LeanWorkerError::Timeout {
1522                        operation,
1523                        duration: timeout,
1524                    });
1525                }
1526                Some(remaining) => match receiver.recv_timeout(remaining) {
1527                    Ok(event) => event,
1528                    Err(mpsc::RecvTimeoutError::Timeout) => {
1529                        if streaming {
1530                            self.record_stream_failure(started, request_backpressure_waits);
1531                        }
1532                        self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1533                            operation,
1534                            duration: timeout,
1535                        })?;
1536                        return Err(LeanWorkerError::Timeout {
1537                            operation,
1538                            duration: timeout,
1539                        });
1540                    }
1541                    Err(mpsc::RecvTimeoutError::Disconnected) => {
1542                        return Err(LeanWorkerError::Protocol {
1543                            message: "worker response reader exited without a terminal response".to_owned(),
1544                        });
1545                    }
1546                },
1547                None => match receiver.recv() {
1548                    Ok(event) => event,
1549                    Err(_err) => {
1550                        return Err(LeanWorkerError::Protocol {
1551                            message: "worker response reader exited without a terminal response".to_owned(),
1552                        });
1553                    }
1554                },
1555            };
1556            request_backpressure_waits = request_backpressure_waits.saturating_add(event.backpressure_waits());
1557            self.stats.backpressure_waits = self.stats.backpressure_waits.saturating_add(event.backpressure_waits());
1558
1559            let message = match event {
1560                RequestReaderEvent::Message { message, .. } => message,
1561                RequestReaderEvent::Terminal { message, stdout, .. } => {
1562                    self.stdout = Some(stdout);
1563                    match message {
1564                        Message::Response(Response::Error { code, message }) => {
1565                            if streaming {
1566                                self.record_stream_failure(started, request_backpressure_waits);
1567                            }
1568                            return Err(LeanWorkerError::Worker { code, message });
1569                        }
1570                        Message::Response(response) => {
1571                            if streaming {
1572                                if matches!(response, Response::StreamComplete { .. }) {
1573                                    self.record_stream_success(started);
1574                                } else {
1575                                    self.record_stream_failure(started, request_backpressure_waits);
1576                                }
1577                            }
1578                            return Ok(response);
1579                        }
1580                        other @ (Message::Handshake { .. }
1581                        | Message::Request(_)
1582                        | Message::Diagnostic(_)
1583                        | Message::ProgressTick(_)
1584                        | Message::DataRow(_)
1585                        | Message::FatalExit(_)) => {
1586                            return Err(LeanWorkerError::Protocol {
1587                                message: format!("worker sent unexpected {operation} message: {other:?}"),
1588                            });
1589                        }
1590                    }
1591                }
1592                RequestReaderEvent::ReadError { message, eof, .. } => {
1593                    if streaming {
1594                        self.record_stream_failure(started, request_backpressure_waits);
1595                    }
1596                    return if eof {
1597                        Err(self.record_exit_error())
1598                    } else {
1599                        Err(LeanWorkerError::Protocol { message })
1600                    };
1601                }
1602            };
1603
1604            match message {
1605                Message::ProgressTick(tick) => {
1606                    if let Err(err) =
1607                        report_parent_progress(progress, elapsed_event(tick.phase, tick.current, tick.total, started))
1608                    {
1609                        if streaming {
1610                            self.record_stream_failure(started, request_backpressure_waits);
1611                        }
1612                        return Err(err);
1613                    }
1614                    if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1615                        if streaming {
1616                            self.record_stream_failure(started, request_backpressure_waits);
1617                        }
1618                        self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1619                        return Err(LeanWorkerError::Cancelled { operation });
1620                    }
1621                }
1622                Message::DataRow(row) => {
1623                    let payload_bytes = row.payload.get().len() as u64;
1624                    if let Err(err) = report_parent_data_row(data, row) {
1625                        if streaming {
1626                            self.record_stream_failure(started, request_backpressure_waits);
1627                        }
1628                        return Err(err);
1629                    }
1630                    self.stats.data_rows_delivered = self.stats.data_rows_delivered.saturating_add(1);
1631                    self.stats.data_row_payload_bytes = self.stats.data_row_payload_bytes.saturating_add(payload_bytes);
1632                    if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1633                        if streaming {
1634                            self.record_stream_failure(started, request_backpressure_waits);
1635                        }
1636                        self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1637                        return Err(LeanWorkerError::Cancelled { operation });
1638                    }
1639                }
1640                Message::Diagnostic(diagnostic) => {
1641                    if let Err(err) = report_parent_diagnostic(diagnostics, diagnostic.into()) {
1642                        if streaming {
1643                            self.record_stream_failure(started, request_backpressure_waits);
1644                        }
1645                        return Err(err);
1646                    }
1647                }
1648                Message::Response(response) => return Err(unexpected_response(operation, &response)),
1649                other @ (Message::Handshake { .. } | Message::Request(_) | Message::FatalExit(_)) => {
1650                    return Err(LeanWorkerError::Protocol {
1651                        message: format!("worker sent unexpected {operation} message: {other:?}"),
1652                    });
1653                }
1654            }
1655        }
1656    }
1657
1658    fn ensure_running(&mut self) -> Result<(), LeanWorkerError> {
1659        match self.status()? {
1660            LeanWorkerStatus::Running => Ok(()),
1661            LeanWorkerStatus::Exited(exit) if exit.success => Err(LeanWorkerError::ChildExited { exit }),
1662            LeanWorkerStatus::Exited(exit) => Err(LeanWorkerError::ChildPanicOrAbort { exit }),
1663        }
1664    }
1665
1666    fn record_stream_success(&mut self, started: Instant) {
1667        self.stats.stream_successes = self.stats.stream_successes.saturating_add(1);
1668        self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1669    }
1670
1671    fn record_stream_failure(&mut self, started: Instant, backpressure_waits: u64) {
1672        self.stats.stream_failures = self.stats.stream_failures.saturating_add(1);
1673        self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1674        if backpressure_waits > 0 {
1675            self.stats.backpressure_failures = self.stats.backpressure_failures.saturating_add(1);
1676        }
1677    }
1678
1679    fn wait_for_exit(&mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
1680        let Some(child) = self.child.as_mut() else {
1681            return Err(self.dead_error());
1682        };
1683        let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1684        let diagnostics = self.read_stderr();
1685        let exit = LeanWorkerExit::from_status(status, diagnostics);
1686        self.last_exit = Some(exit.clone());
1687        self.child = None;
1688        self.stdin = None;
1689        self.stdout = None;
1690        self.stats.exits = self.stats.exits.saturating_add(1);
1691        Ok(exit)
1692    }
1693
1694    fn try_record_exit(&mut self) -> Option<LeanWorkerExit> {
1695        let child = self.child.as_mut()?;
1696        let status = child.try_wait().ok().flatten()?;
1697        let diagnostics = self.read_stderr();
1698        let exit = LeanWorkerExit::from_status(status, diagnostics);
1699        self.last_exit = Some(exit.clone());
1700        self.child = None;
1701        self.stdin = None;
1702        self.stdout = None;
1703        self.stats.exits = self.stats.exits.saturating_add(1);
1704        Some(exit)
1705    }
1706
1707    fn record_exit_error(&mut self) -> LeanWorkerError {
1708        match self.wait_for_exit() {
1709            Ok(exit) if exit.success => LeanWorkerError::ChildExited { exit },
1710            Ok(exit) => LeanWorkerError::ChildPanicOrAbort { exit },
1711            Err(err) => err,
1712        }
1713    }
1714
1715    fn stop_existing_child(&mut self) -> Result<(), LeanWorkerError> {
1716        if let Some(child) = self.child.as_mut() {
1717            drop(child.kill());
1718            let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1719            let diagnostics = self.read_stderr();
1720            self.last_exit = Some(LeanWorkerExit::from_status(status, diagnostics));
1721            self.stats.exits = self.stats.exits.saturating_add(1);
1722        }
1723        self.child = None;
1724        self.stdin = None;
1725        self.stdout = None;
1726        Ok(())
1727    }
1728
1729    fn dead_error(&self) -> LeanWorkerError {
1730        let exit = self.last_exit.clone().unwrap_or_else(|| LeanWorkerExit {
1731            success: false,
1732            code: None,
1733            status: "worker is not running".to_owned(),
1734            diagnostics: String::new(),
1735        });
1736        if exit.success {
1737            LeanWorkerError::ChildExited { exit }
1738        } else {
1739            LeanWorkerError::ChildPanicOrAbort { exit }
1740        }
1741    }
1742
1743    fn read_stderr(&mut self) -> String {
1744        let mut diagnostics = String::new();
1745        if let Some(mut pipe) = self.stderr.take() {
1746            drop(pipe.read_to_string(&mut diagnostics));
1747        }
1748        diagnostics
1749    }
1750
1751    fn child_rss_kib(&mut self) -> Option<u64> {
1752        let child = self.child.as_mut()?;
1753        child_rss_kib(child.id())
1754    }
1755}
1756
1757enum RequestReaderEvent {
1758    Message {
1759        message: Message,
1760        backpressure_waits: u64,
1761    },
1762    Terminal {
1763        message: Message,
1764        stdout: BufReader<ChildStdout>,
1765        backpressure_waits: u64,
1766    },
1767    ReadError {
1768        message: String,
1769        eof: bool,
1770        backpressure_waits: u64,
1771    },
1772}
1773
1774impl RequestReaderEvent {
1775    fn backpressure_waits(&self) -> u64 {
1776        match self {
1777            Self::Message { backpressure_waits, .. }
1778            | Self::Terminal { backpressure_waits, .. }
1779            | Self::ReadError { backpressure_waits, .. } => *backpressure_waits,
1780        }
1781    }
1782
1783    fn add_backpressure_wait(&mut self) {
1784        match self {
1785            Self::Message { backpressure_waits, .. }
1786            | Self::Terminal { backpressure_waits, .. }
1787            | Self::ReadError { backpressure_waits, .. } => {
1788                *backpressure_waits = backpressure_waits.saturating_add(1);
1789            }
1790        }
1791    }
1792}
1793
1794#[allow(
1795    clippy::needless_pass_by_value,
1796    reason = "the request reader thread must own the sender"
1797)]
1798fn read_request_messages(mut stdout: BufReader<ChildStdout>, sender: mpsc::SyncSender<RequestReaderEvent>) {
1799    loop {
1800        match read_frame(&mut stdout) {
1801            Ok(frame) if matches!(frame.message, Message::Response(_)) => {
1802                let _ = send_reader_event(
1803                    &sender,
1804                    RequestReaderEvent::Terminal {
1805                        message: frame.message,
1806                        stdout,
1807                        backpressure_waits: 0,
1808                    },
1809                );
1810                return;
1811            }
1812            Ok(frame) => {
1813                if send_reader_event(
1814                    &sender,
1815                    RequestReaderEvent::Message {
1816                        message: frame.message,
1817                        backpressure_waits: 0,
1818                    },
1819                )
1820                .is_err()
1821                {
1822                    return;
1823                }
1824            }
1825            Err(err) => {
1826                let _ = send_reader_event(
1827                    &sender,
1828                    RequestReaderEvent::ReadError {
1829                        message: err.to_string(),
1830                        eof: err.is_eof(),
1831                        backpressure_waits: 0,
1832                    },
1833                );
1834                return;
1835            }
1836        }
1837    }
1838}
1839
1840fn send_reader_event(sender: &mpsc::SyncSender<RequestReaderEvent>, event: RequestReaderEvent) -> Result<(), ()> {
1841    match sender.try_send(event) {
1842        Ok(()) => Ok(()),
1843        Err(mpsc::TrySendError::Full(mut event)) => {
1844            event.add_backpressure_wait();
1845            sender.send(event).map_err(|_| ())
1846        }
1847        Err(mpsc::TrySendError::Disconnected(_event)) => Err(()),
1848    }
1849}
1850
1851impl Drop for LeanWorker {
1852    fn drop(&mut self) {
1853        if let Some(child) = self.child.as_mut() {
1854            drop(child.kill());
1855            drop(child.wait());
1856        }
1857    }
1858}
1859
1860fn expect_handshake(stdout: &mut BufReader<ChildStdout>) -> Result<LeanWorkerRuntimeMetadata, LeanWorkerError> {
1861    let frame = read_frame(stdout).map_err(|err| {
1862        if err.is_eof() {
1863            LeanWorkerError::Handshake {
1864                message: "child closed stdout before handshake".to_owned(),
1865            }
1866        } else {
1867            LeanWorkerError::Handshake {
1868                message: err.to_string(),
1869            }
1870        }
1871    })?;
1872    match frame.message {
1873        Message::Handshake {
1874            worker_version,
1875            protocol_version,
1876        } if protocol_version == crate::protocol::PROTOCOL_VERSION => Ok(LeanWorkerRuntimeMetadata {
1877            worker_version,
1878            protocol_version,
1879            lean_version: None,
1880        }),
1881        other @ (Message::Handshake { .. }
1882        | Message::Request(_)
1883        | Message::Response(_)
1884        | Message::Diagnostic(_)
1885        | Message::ProgressTick(_)
1886        | Message::DataRow(_)
1887        | Message::FatalExit(_)) => Err(LeanWorkerError::Handshake {
1888            message: format!("unexpected handshake frame: {other:?}"),
1889        }),
1890    }
1891}
1892
1893fn wait_with_stderr(child: &mut Child, stderr: Option<ChildStderr>) -> Result<LeanWorkerExit, LeanWorkerError> {
1894    let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1895    let mut diagnostics = String::new();
1896    if let Some(mut pipe) = stderr {
1897        drop(pipe.read_to_string(&mut diagnostics));
1898    }
1899    Ok(LeanWorkerExit::from_status(status, diagnostics))
1900}
1901
1902fn unexpected_response(operation: &'static str, response: &Response) -> LeanWorkerError {
1903    LeanWorkerError::Protocol {
1904        message: format!("worker sent unexpected {operation} response: {response:?}"),
1905    }
1906}
1907
1908fn path_string(path: &Path) -> String {
1909    path.to_string_lossy().into_owned()
1910}
1911
1912#[cfg(target_os = "linux")]
1913fn child_rss_kib(pid: u32) -> Option<u64> {
1914    let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
1915    status.lines().find_map(|line| {
1916        let rest = line.strip_prefix("VmRSS:")?;
1917        rest.split_whitespace().next()?.parse::<u64>().ok()
1918    })
1919}
1920
1921#[cfg(not(target_os = "linux"))]
1922fn child_rss_kib(pid: u32) -> Option<u64> {
1923    let output = Command::new("ps")
1924        .args(["-o", "rss=", "-p", &pid.to_string()])
1925        .output()
1926        .ok()?;
1927    if !output.status.success() {
1928        return None;
1929    }
1930    let text = String::from_utf8_lossy(&output.stdout);
1931    text.trim().parse::<u64>().ok().filter(|value| *value > 0)
1932}