1use std::ffi::OsString;
2use std::fmt;
3use std::io::{BufReader, BufWriter, Read as _};
4use std::path::{Path, PathBuf};
5use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
6use std::sync::mpsc;
7use std::thread;
8use std::time::{Duration, Instant};
9
10use std::sync::Mutex;
11
12use crate::capability::LeanWorkerBootstrapDiagnosticCode;
13use crate::protocol::{Message, Request, Response, read_frame, write_frame};
14use crate::session::LeanWorkerDataSinkTarget;
15use crate::session::{
16 LeanWorkerCancellationToken, LeanWorkerDataSink, LeanWorkerDiagnosticSink, LeanWorkerProgressSink,
17 LeanWorkerRawDataRow, LeanWorkerRawDataSink, LeanWorkerRuntimeMetadata, LeanWorkerSessionConfig,
18 LeanWorkerStreamSummary, check_cancelled, elapsed_event, report_parent_data_row, report_parent_diagnostic,
19 report_parent_progress,
20};
21use crate::types::{
22 LeanWorkerCapabilityMetadata, LeanWorkerDeclarationFilter, LeanWorkerDeclarationRow, LeanWorkerDoctorReport,
23 LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerKernelResult, LeanWorkerMetaResult,
24 LeanWorkerMetaTransparency, LeanWorkerProcessFileOutcome, LeanWorkerProcessModuleOutcome, LeanWorkerRendered,
25};
26
27const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
28const WORKER_EVENT_BUFFER_CAPACITY: usize = 64;
29
30pub const LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(30);
32
33pub const LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING: Duration = Duration::from_mins(10);
35
36#[derive(Clone, Debug)]
58pub struct LeanWorkerConfig {
59 executable: PathBuf,
60 current_dir: Option<PathBuf>,
61 env: Vec<(OsString, OsString)>,
62 startup_timeout: Duration,
63 request_timeout: Duration,
64 restart_policy: LeanWorkerRestartPolicy,
65}
66
67impl LeanWorkerConfig {
68 pub fn new(executable: impl Into<PathBuf>) -> Self {
70 Self {
71 executable: executable.into(),
72 current_dir: None,
73 env: Vec::new(),
74 startup_timeout: DEFAULT_STARTUP_TIMEOUT,
75 request_timeout: LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT,
76 restart_policy: LeanWorkerRestartPolicy::default(),
77 }
78 }
79
80 pub fn executable(&self) -> &Path {
82 &self.executable
83 }
84
85 #[must_use]
87 pub fn current_dir(mut self, path: impl Into<PathBuf>) -> Self {
88 self.current_dir = Some(path.into());
89 self
90 }
91
92 #[must_use]
94 pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
95 self.env.push((key.into(), value.into()));
96 self
97 }
98
99 #[must_use]
101 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
102 self.startup_timeout = timeout;
103 self
104 }
105
106 #[must_use]
112 pub fn request_timeout(mut self, timeout: Duration) -> Self {
113 self.request_timeout = timeout;
114 self
115 }
116
117 #[must_use]
119 pub fn long_running_requests(mut self) -> Self {
120 self.request_timeout = LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING;
121 self
122 }
123
124 #[must_use]
130 pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
131 self.restart_policy = policy;
132 self
133 }
134}
135
136#[derive(Clone, Debug, Default, Eq, PartialEq)]
143pub struct LeanWorkerRestartPolicy {
144 max_requests: Option<u64>,
145 max_imports: Option<u64>,
146 max_rss_kib: Option<u64>,
147 idle_restart_after: Option<Duration>,
148}
149
150impl LeanWorkerRestartPolicy {
151 #[must_use]
153 pub fn disabled() -> Self {
154 Self::default()
155 }
156
157 #[must_use]
159 pub fn max_requests(mut self, limit: u64) -> Self {
160 self.max_requests = Some(limit.max(1));
161 self
162 }
163
164 #[must_use]
166 pub fn max_imports(mut self, limit: u64) -> Self {
167 self.max_imports = Some(limit.max(1));
168 self
169 }
170
171 #[must_use]
177 pub fn max_rss_kib(mut self, limit: u64) -> Self {
178 self.max_rss_kib = Some(limit.max(1));
179 self
180 }
181
182 #[must_use]
184 pub fn idle_restart_after(mut self, duration: Duration) -> Self {
185 self.idle_restart_after = Some(duration);
186 self
187 }
188}
189
190#[derive(Clone, Debug, Eq, PartialEq)]
192pub enum LeanWorkerRestartReason {
193 Explicit,
195 MaxRequests { limit: u64 },
197 MaxImports { limit: u64 },
199 RssCeiling { current_kib: u64, limit_kib: u64 },
201 Idle { idle_for: Duration, limit: Duration },
203 Cancelled { operation: &'static str },
205 RequestTimeout {
207 operation: &'static str,
208 duration: Duration,
209 },
210}
211
212#[derive(Clone, Debug, Default, Eq, PartialEq)]
214pub struct LeanWorkerStats {
215 pub requests: u64,
217 pub imports: u64,
219 pub exits: u64,
221 pub restarts: u64,
223 pub explicit_cycles: u64,
225 pub max_request_restarts: u64,
227 pub max_import_restarts: u64,
229 pub rss_restarts: u64,
231 pub idle_restarts: u64,
233 pub cancelled_restarts: u64,
235 pub timeout_restarts: u64,
237 pub rss_samples_unavailable: u64,
239 pub last_rss_kib: Option<u64>,
241 pub last_restart_reason: Option<LeanWorkerRestartReason>,
243 pub stream_requests: u64,
245 pub stream_successes: u64,
247 pub stream_failures: u64,
249 pub data_rows_delivered: u64,
251 pub data_row_payload_bytes: u64,
253 pub stream_elapsed: Duration,
255 pub backpressure_waits: u64,
257 pub backpressure_failures: u64,
259}
260
261impl LeanWorkerStats {
262 fn record_restart(&mut self, reason: LeanWorkerRestartReason) {
263 self.restarts = self.restarts.saturating_add(1);
264 match &reason {
265 LeanWorkerRestartReason::Explicit => {
266 self.explicit_cycles = self.explicit_cycles.saturating_add(1);
267 }
268 LeanWorkerRestartReason::MaxRequests { .. } => {
269 self.max_request_restarts = self.max_request_restarts.saturating_add(1);
270 }
271 LeanWorkerRestartReason::MaxImports { .. } => {
272 self.max_import_restarts = self.max_import_restarts.saturating_add(1);
273 }
274 LeanWorkerRestartReason::RssCeiling { .. } => {
275 self.rss_restarts = self.rss_restarts.saturating_add(1);
276 }
277 LeanWorkerRestartReason::Idle { .. } => {
278 self.idle_restarts = self.idle_restarts.saturating_add(1);
279 }
280 LeanWorkerRestartReason::Cancelled { .. } => {
281 self.cancelled_restarts = self.cancelled_restarts.saturating_add(1);
282 }
283 LeanWorkerRestartReason::RequestTimeout { .. } => {
284 self.timeout_restarts = self.timeout_restarts.saturating_add(1);
285 }
286 }
287 self.last_restart_reason = Some(reason);
288 }
289}
290
291#[derive(Clone, Debug, Eq, PartialEq)]
293pub enum LeanWorkerStatus {
294 Running,
296 Exited(LeanWorkerExit),
298}
299
300#[derive(Clone, Debug, Eq, PartialEq)]
302pub struct LeanWorkerExit {
303 pub success: bool,
305 pub code: Option<i32>,
307 pub status: String,
309 pub diagnostics: String,
311}
312
313impl LeanWorkerExit {
314 fn from_status(status: ExitStatus, diagnostics: String) -> Self {
315 Self {
316 success: status.success(),
317 code: status.code(),
318 status: status.to_string(),
319 diagnostics,
320 }
321 }
322}
323
324#[derive(Debug)]
326pub enum LeanWorkerError {
327 Spawn {
329 executable: PathBuf,
330 source: std::io::Error,
331 },
332 WorkerChildUnresolved {
334 tried: Vec<PathBuf>,
336 },
337 WorkerChildNotExecutable { path: PathBuf, reason: String },
339 Bootstrap {
341 code: LeanWorkerBootstrapDiagnosticCode,
342 message: String,
343 },
344 CapabilityBuild {
346 diagnostic: lean_toolchain::LinkDiagnostics,
348 },
349 Setup { message: String },
351 Handshake { message: String },
353 Protocol { message: String },
355 Worker { code: String, message: String },
357 ChildExited { exit: LeanWorkerExit },
359 ChildPanicOrAbort { exit: LeanWorkerExit },
361 Timeout {
363 operation: &'static str,
364 duration: Duration,
365 },
366 Cancelled { operation: &'static str },
368 ProgressPanic { message: String },
370 DataSinkPanic { message: String },
372 DiagnosticSinkPanic { message: String },
374 StreamExportFailed { status: u8 },
376 StreamCallbackFailed { status: u8, description: String },
378 StreamRowMalformed { message: String },
380 CapabilityMetadataMalformed { message: String },
382 CapabilityMetadataMismatch {
384 export: String,
385 expected: Box<LeanWorkerCapabilityMetadata>,
386 actual: Box<LeanWorkerCapabilityMetadata>,
387 },
388 CapabilityDoctorMalformed { message: String },
390 TypedCommandRequestEncode { export: String, message: String },
392 TypedCommandResponseDecode { export: String, message: String },
394 TypedCommandRowDecode {
396 export: String,
397 stream: String,
398 sequence: u64,
399 message: String,
400 },
401 TypedCommandSummaryDecode { export: String, message: String },
403 LeaseInvalidated { reason: String },
405 WorkerPoolExhausted { max_workers: usize },
407 WorkerPoolMemoryBudgetExceeded { current_kib: u64, limit_kib: u64 },
409 WorkerPoolQueueTimeout { waited: Duration },
411 UnsupportedRequest { operation: &'static str },
413 Wait { source: std::io::Error },
415}
416
417impl fmt::Display for LeanWorkerError {
418 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419 match self {
420 Self::Spawn { executable, source } => {
421 write!(f, "failed to spawn worker {}: {source}", executable.display())
422 }
423 Self::WorkerChildUnresolved { tried } => {
424 let tried = tried
425 .iter()
426 .map(|path| path.display().to_string())
427 .collect::<Vec<_>>()
428 .join(", ");
429 write!(
430 f,
431 "could not resolve lean-rs-worker-child; set LEAN_RS_WORKER_CHILD or place it beside the current executable (tried: {tried})"
432 )
433 }
434 Self::WorkerChildNotExecutable { path, reason } => {
435 write!(f, "worker child '{}' is not executable: {reason}", path.display())
436 }
437 Self::Bootstrap { code, message } => {
438 write!(f, "worker bootstrap check {code} failed: {message}")
439 }
440 Self::CapabilityBuild { diagnostic } => {
441 write!(f, "worker capability Lake target build failed: {diagnostic}")
442 }
443 Self::Setup { message } => write!(f, "worker child setup failed: {message}"),
444 Self::Handshake { message } => write!(f, "worker handshake failed: {message}"),
445 Self::Protocol { message } => write!(f, "worker protocol failed: {message}"),
446 Self::Worker { code, message } => write!(f, "worker returned {code}: {message}"),
447 Self::ChildExited { exit } => write!(f, "worker exited with {}", exit.status),
448 Self::ChildPanicOrAbort { exit } => {
449 write!(f, "worker exited fatally with {}", exit.status)
450 }
451 Self::Timeout { operation, duration } => {
452 write!(f, "worker operation {operation} timed out after {duration:?}")
453 }
454 Self::Cancelled { operation } => write!(f, "worker operation {operation} was cancelled"),
455 Self::ProgressPanic { message } => write!(f, "worker progress sink panicked: {message}"),
456 Self::DataSinkPanic { message } => write!(f, "worker data sink panicked: {message}"),
457 Self::DiagnosticSinkPanic { message } => {
458 write!(f, "worker diagnostic sink panicked: {message}")
459 }
460 Self::StreamExportFailed { status } => write!(f, "streaming export returned status {status}"),
461 Self::StreamCallbackFailed { status, description } => {
462 write!(f, "streaming callback failed with status {status}: {description}")
463 }
464 Self::StreamRowMalformed { message } => write!(f, "streaming export emitted malformed row: {message}"),
465 Self::CapabilityMetadataMalformed { message } => {
466 write!(f, "capability metadata export returned malformed JSON: {message}")
467 }
468 Self::CapabilityMetadataMismatch { export, .. } => {
469 write!(f, "capability metadata from {export} did not match expectation")
470 }
471 Self::CapabilityDoctorMalformed { message } => {
472 write!(f, "capability doctor export returned malformed JSON: {message}")
473 }
474 Self::TypedCommandRequestEncode { export, message } => {
475 write!(f, "typed worker command {export} request JSON encode failed: {message}")
476 }
477 Self::TypedCommandResponseDecode { export, message } => {
478 write!(
479 f,
480 "typed worker command {export} response JSON decode failed: {message}"
481 )
482 }
483 Self::TypedCommandRowDecode {
484 export,
485 stream,
486 sequence,
487 message,
488 } => {
489 write!(
490 f,
491 "typed worker command {export} row decode failed at stream {stream} sequence {sequence}: {message}"
492 )
493 }
494 Self::TypedCommandSummaryDecode { export, message } => {
495 write!(
496 f,
497 "typed worker command {export} terminal summary decode failed: {message}"
498 )
499 }
500 Self::LeaseInvalidated { reason } => write!(f, "worker pool lease was invalidated: {reason}"),
501 Self::WorkerPoolExhausted { max_workers } => {
502 write!(
503 f,
504 "worker pool cannot admit another session key; max_workers={max_workers}"
505 )
506 }
507 Self::WorkerPoolMemoryBudgetExceeded { current_kib, limit_kib } => {
508 write!(
509 f,
510 "worker pool cannot admit work within RSS budget; current_kib={current_kib} limit_kib={limit_kib}"
511 )
512 }
513 Self::WorkerPoolQueueTimeout { waited } => {
514 write!(f, "worker pool admission timed out after {waited:?}")
515 }
516 Self::UnsupportedRequest { operation } => {
517 write!(f, "worker operation {operation} is not supported")
518 }
519 Self::Wait { source } => write!(f, "failed to wait for worker child: {source}"),
520 }
521 }
522}
523
524impl std::error::Error for LeanWorkerError {
525 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
526 match self {
527 Self::Spawn { source, .. } | Self::Wait { source } => Some(source),
528 Self::CapabilityBuild { diagnostic } => Some(diagnostic),
529 Self::WorkerChildUnresolved { .. } | Self::WorkerChildNotExecutable { .. } | Self::Bootstrap { .. } => None,
530 Self::Setup { .. }
531 | Self::Handshake { .. }
532 | Self::Protocol { .. }
533 | Self::Worker { .. }
534 | Self::ChildExited { .. }
535 | Self::ChildPanicOrAbort { .. }
536 | Self::Timeout { .. }
537 | Self::Cancelled { .. }
538 | Self::ProgressPanic { .. }
539 | Self::DataSinkPanic { .. }
540 | Self::DiagnosticSinkPanic { .. }
541 | Self::StreamExportFailed { .. }
542 | Self::StreamCallbackFailed { .. }
543 | Self::StreamRowMalformed { .. }
544 | Self::CapabilityMetadataMalformed { .. }
545 | Self::CapabilityMetadataMismatch { .. }
546 | Self::CapabilityDoctorMalformed { .. }
547 | Self::TypedCommandRequestEncode { .. }
548 | Self::TypedCommandResponseDecode { .. }
549 | Self::TypedCommandRowDecode { .. }
550 | Self::TypedCommandSummaryDecode { .. }
551 | Self::LeaseInvalidated { .. }
552 | Self::WorkerPoolExhausted { .. }
553 | Self::WorkerPoolMemoryBudgetExceeded { .. }
554 | Self::WorkerPoolQueueTimeout { .. }
555 | Self::UnsupportedRequest { .. } => None,
556 }
557 }
558}
559
560#[derive(Debug)]
566pub struct LeanWorker {
567 config: LeanWorkerConfig,
568 child: Option<Child>,
569 stdin: Option<BufWriter<ChildStdin>>,
570 stdout: Option<BufReader<ChildStdout>>,
571 stderr: Option<ChildStderr>,
572 last_exit: Option<LeanWorkerExit>,
573 runtime_metadata: LeanWorkerRuntimeMetadata,
574 stats: LeanWorkerStats,
575 requests_since_restart: u64,
576 imports_since_restart: u64,
577 last_activity: Instant,
578}
579
580impl LeanWorker {
581 pub fn spawn(config: &LeanWorkerConfig) -> Result<Self, LeanWorkerError> {
589 let mut command = Command::new(&config.executable);
590 command
591 .stdin(Stdio::piped())
592 .stdout(Stdio::piped())
593 .stderr(Stdio::piped())
594 .env("LEAN_ABORT_ON_PANIC", "1")
595 .env("LEAN_BACKTRACE", "0")
596 .env("RUST_BACKTRACE", "0");
597
598 if let Some(current_dir) = &config.current_dir {
599 command.current_dir(current_dir);
600 }
601 for (key, value) in &config.env {
602 command.env(key, value);
603 }
604
605 let mut child = command.spawn().map_err(|source| LeanWorkerError::Spawn {
606 executable: config.executable.clone(),
607 source,
608 })?;
609
610 let stdin = child
611 .stdin
612 .take()
613 .map(BufWriter::new)
614 .ok_or_else(|| LeanWorkerError::Setup {
615 message: "child stdin unavailable".to_owned(),
616 })?;
617 let stdout = child.stdout.take().ok_or_else(|| LeanWorkerError::Setup {
618 message: "child stdout unavailable".to_owned(),
619 })?;
620 let stderr = child.stderr.take();
621
622 let (sender, receiver) = mpsc::channel();
623 let _handshake_reader = thread::spawn(move || {
624 let mut stdout = BufReader::new(stdout);
625 let result = expect_handshake(&mut stdout);
626 drop(sender.send((stdout, result)));
627 });
628
629 let (stdout, runtime_metadata) = match receiver.recv_timeout(config.startup_timeout) {
630 Ok((stdout, Ok(metadata))) => (stdout, metadata),
631 Ok((_stdout, Err(err))) => {
632 let mut worker = Self {
633 config: config.clone(),
634 child: Some(child),
635 stdin: Some(stdin),
636 stdout: None,
637 stderr,
638 last_exit: None,
639 runtime_metadata: LeanWorkerRuntimeMetadata {
640 worker_version: String::new(),
641 protocol_version: crate::protocol::PROTOCOL_VERSION,
642 lean_version: None,
643 },
644 stats: LeanWorkerStats::default(),
645 requests_since_restart: 0,
646 imports_since_restart: 0,
647 last_activity: Instant::now(),
648 };
649 let exit = worker.try_record_exit();
650 return Err(match exit {
651 Some(exit) if !exit.success => LeanWorkerError::ChildPanicOrAbort { exit },
652 Some(exit) => LeanWorkerError::ChildExited { exit },
653 None => err,
654 });
655 }
656 Err(mpsc::RecvTimeoutError::Timeout) => {
657 drop(child.kill());
658 let _exit = wait_with_stderr(&mut child, stderr)?;
659 return Err(LeanWorkerError::Timeout {
660 operation: "startup",
661 duration: config.startup_timeout,
662 });
663 }
664 Err(mpsc::RecvTimeoutError::Disconnected) => {
665 return Err(LeanWorkerError::Handshake {
666 message: "handshake reader exited without a result".to_owned(),
667 });
668 }
669 };
670
671 Ok(Self {
672 config: config.clone(),
673 child: Some(child),
674 stdin: Some(stdin),
675 stdout: Some(stdout),
676 stderr,
677 last_exit: None,
678 runtime_metadata,
679 stats: LeanWorkerStats::default(),
680 requests_since_restart: 0,
681 imports_since_restart: 0,
682 last_activity: Instant::now(),
683 })
684 }
685
686 pub fn health(&mut self) -> Result<(), LeanWorkerError> {
693 self.prepare_request(false)?;
694 self.send_request(Request::Health)?;
695 self.record_request(false);
696 match self.read_response("health")? {
697 Response::HealthOk => Ok(()),
698 other @ (Response::CapabilityLoaded
699 | Response::U64 { .. }
700 | Response::HostSessionOpened
701 | Response::Elaboration { .. }
702 | Response::KernelCheck { .. }
703 | Response::Strings { .. }
704 | Response::StreamComplete { .. }
705 | Response::StreamExportFailed { .. }
706 | Response::StreamCallbackFailed { .. }
707 | Response::StreamRowMalformed { .. }
708 | Response::CapabilityMetadata { .. }
709 | Response::CapabilityDoctor { .. }
710 | Response::CapabilityMetadataMalformed { .. }
711 | Response::CapabilityDoctorMalformed { .. }
712 | Response::JsonCommand { .. }
713 | Response::MetaExpr { .. }
714 | Response::MetaBool { .. }
715 | Response::Declaration { .. }
716 | Response::DeclarationBulk { .. }
717 | Response::ProcessFile { .. }
718 | Response::ProcessModule { .. }
719 | Response::RowsComplete { .. }
720 | Response::Terminating
721 | Response::Error { .. }) => Err(unexpected_response("health", &other)),
722 }
723 }
724
725 pub fn load_fixture_capability(&mut self, fixture_root: impl AsRef<Path>) -> Result<(), LeanWorkerError> {
736 self.prepare_request(true)?;
737 self.send_request(Request::LoadFixtureCapability {
738 fixture_root: path_string(fixture_root.as_ref()),
739 })?;
740 self.record_request(true);
741 match self.read_response("load_fixture_capability")? {
742 Response::CapabilityLoaded => Ok(()),
743 other @ (Response::HealthOk
744 | Response::U64 { .. }
745 | Response::HostSessionOpened
746 | Response::Elaboration { .. }
747 | Response::KernelCheck { .. }
748 | Response::Strings { .. }
749 | Response::StreamComplete { .. }
750 | Response::StreamExportFailed { .. }
751 | Response::StreamCallbackFailed { .. }
752 | Response::StreamRowMalformed { .. }
753 | Response::CapabilityMetadata { .. }
754 | Response::CapabilityDoctor { .. }
755 | Response::CapabilityMetadataMalformed { .. }
756 | Response::CapabilityDoctorMalformed { .. }
757 | Response::JsonCommand { .. }
758 | Response::MetaExpr { .. }
759 | Response::MetaBool { .. }
760 | Response::Declaration { .. }
761 | Response::DeclarationBulk { .. }
762 | Response::ProcessFile { .. }
763 | Response::ProcessModule { .. }
764 | Response::RowsComplete { .. }
765 | Response::Terminating
766 | Response::Error { .. }) => Err(unexpected_response("load_fixture_capability", &other)),
767 }
768 }
769
770 pub fn call_fixture_mul(
777 &mut self,
778 fixture_root: impl AsRef<Path>,
779 lhs: u64,
780 rhs: u64,
781 ) -> Result<u64, LeanWorkerError> {
782 self.prepare_request(true)?;
783 self.send_request(Request::CallFixtureMul {
784 fixture_root: path_string(fixture_root.as_ref()),
785 lhs,
786 rhs,
787 })?;
788 self.record_request(true);
789 match self.read_response("call_fixture_mul")? {
790 Response::U64 { value } => Ok(value),
791 other @ (Response::HealthOk
792 | Response::CapabilityLoaded
793 | Response::HostSessionOpened
794 | Response::Elaboration { .. }
795 | Response::KernelCheck { .. }
796 | Response::Strings { .. }
797 | Response::StreamComplete { .. }
798 | Response::StreamExportFailed { .. }
799 | Response::StreamCallbackFailed { .. }
800 | Response::StreamRowMalformed { .. }
801 | Response::CapabilityMetadata { .. }
802 | Response::CapabilityDoctor { .. }
803 | Response::CapabilityMetadataMalformed { .. }
804 | Response::CapabilityDoctorMalformed { .. }
805 | Response::JsonCommand { .. }
806 | Response::MetaExpr { .. }
807 | Response::MetaBool { .. }
808 | Response::Declaration { .. }
809 | Response::DeclarationBulk { .. }
810 | Response::ProcessFile { .. }
811 | Response::ProcessModule { .. }
812 | Response::RowsComplete { .. }
813 | Response::Terminating
814 | Response::Error { .. }) => Err(unexpected_response("call_fixture_mul", &other)),
815 }
816 }
817
818 pub fn status(&mut self) -> Result<LeanWorkerStatus, LeanWorkerError> {
824 if let Some(exit) = &self.last_exit {
825 return Ok(LeanWorkerStatus::Exited(exit.clone()));
826 }
827 let Some(child) = self.child.as_mut() else {
828 return Ok(LeanWorkerStatus::Exited(LeanWorkerExit {
829 success: false,
830 code: None,
831 status: "worker is not running".to_owned(),
832 diagnostics: String::new(),
833 }));
834 };
835 match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
836 Some(status) => {
837 let diagnostics = self.read_stderr();
838 let exit = LeanWorkerExit::from_status(status, diagnostics);
839 self.last_exit = Some(exit.clone());
840 self.child = None;
841 self.stdin = None;
842 self.stdout = None;
843 self.stats.exits = self.stats.exits.saturating_add(1);
844 Ok(LeanWorkerStatus::Exited(exit))
845 }
846 None => Ok(LeanWorkerStatus::Running),
847 }
848 }
849
850 #[must_use]
852 pub fn stats(&self) -> LeanWorkerStats {
853 self.stats.clone()
854 }
855
856 #[must_use]
858 pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
859 self.runtime_metadata.clone()
860 }
861
862 pub fn rss_kib(&mut self) -> Option<u64> {
868 match self.child_rss_kib() {
869 Some(value) => {
870 self.stats.last_rss_kib = Some(value);
871 Some(value)
872 }
873 None => {
874 self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
875 None
876 }
877 }
878 }
879
880 #[must_use]
882 pub fn request_timeout(&self) -> Duration {
883 self.config.request_timeout
884 }
885
886 pub fn set_request_timeout(&mut self, timeout: Duration) {
891 self.config.request_timeout = timeout;
892 }
893
894 pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
905 self.restart_with_reason(LeanWorkerRestartReason::Explicit)
906 }
907
908 pub(crate) fn cycle_with_restart_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
909 self.restart_with_reason(reason)
910 }
911
912 pub fn restart(&mut self) -> Result<(), LeanWorkerError> {
923 self.cycle()
924 }
925
926 #[doc(hidden)]
927 pub fn __kill_for_test(&mut self) -> Result<(), LeanWorkerError> {
934 let Some(child) = self.child.as_mut() else {
935 return Err(self.dead_error());
936 };
937 child.kill().map_err(|source| LeanWorkerError::Wait { source })?;
938 Ok(())
939 }
940
941 pub fn terminate(mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
948 self.send_request(Request::Terminate)?;
949 match self.read_response("terminate")? {
950 Response::Terminating => self.wait_for_exit(),
951 other @ (Response::HealthOk
952 | Response::CapabilityLoaded
953 | Response::U64 { .. }
954 | Response::HostSessionOpened
955 | Response::Elaboration { .. }
956 | Response::KernelCheck { .. }
957 | Response::Strings { .. }
958 | Response::StreamComplete { .. }
959 | Response::StreamExportFailed { .. }
960 | Response::StreamCallbackFailed { .. }
961 | Response::StreamRowMalformed { .. }
962 | Response::CapabilityMetadata { .. }
963 | Response::CapabilityDoctor { .. }
964 | Response::CapabilityMetadataMalformed { .. }
965 | Response::CapabilityDoctorMalformed { .. }
966 | Response::JsonCommand { .. }
967 | Response::MetaExpr { .. }
968 | Response::MetaBool { .. }
969 | Response::Declaration { .. }
970 | Response::DeclarationBulk { .. }
971 | Response::ProcessFile { .. }
972 | Response::ProcessModule { .. }
973 | Response::RowsComplete { .. }
974 | Response::Error { .. }) => Err(unexpected_response("terminate", &other)),
975 }
976 }
977
978 #[doc(hidden)]
979 pub fn __trigger_lean_panic_fixture(
986 mut self,
987 fixture_root: impl AsRef<Path>,
988 ) -> Result<LeanWorkerExit, LeanWorkerError> {
989 self.prepare_request(true)?;
990 self.send_request(Request::TriggerLeanPanic {
991 fixture_root: path_string(fixture_root.as_ref()),
992 })?;
993 self.record_request(true);
994 match self.read_response("trigger_lean_panic") {
995 Ok(response) => Err(unexpected_response("trigger_lean_panic", &response)),
996 Err(LeanWorkerError::ChildPanicOrAbort { exit }) => Ok(exit),
997 Err(err) => Err(err),
998 }
999 }
1000
1001 #[doc(hidden)]
1002 pub fn __emit_test_rows(
1009 &mut self,
1010 streams: Vec<String>,
1011 cancellation: Option<&LeanWorkerCancellationToken>,
1012 data: Option<&dyn LeanWorkerDataSink>,
1013 ) -> Result<u64, LeanWorkerError> {
1014 const OPERATION: &str = "emit_test_rows";
1015 check_cancelled(OPERATION, cancellation)?;
1016 self.prepare_request(false)?;
1017 self.send_request(Request::EmitTestRows { streams })?;
1018 self.record_request(false);
1019 match self.read_response_with_events(
1020 OPERATION,
1021 None,
1022 cancellation,
1023 data.map(LeanWorkerDataSinkTarget::Value),
1024 None,
1025 )? {
1026 Response::RowsComplete { count } => Ok(count),
1027 other @ (Response::HealthOk
1028 | Response::CapabilityLoaded
1029 | Response::U64 { .. }
1030 | Response::HostSessionOpened
1031 | Response::Elaboration { .. }
1032 | Response::KernelCheck { .. }
1033 | Response::Strings { .. }
1034 | Response::StreamComplete { .. }
1035 | Response::StreamExportFailed { .. }
1036 | Response::StreamCallbackFailed { .. }
1037 | Response::StreamRowMalformed { .. }
1038 | Response::CapabilityMetadata { .. }
1039 | Response::CapabilityDoctor { .. }
1040 | Response::CapabilityMetadataMalformed { .. }
1041 | Response::CapabilityDoctorMalformed { .. }
1042 | Response::JsonCommand { .. }
1043 | Response::MetaExpr { .. }
1044 | Response::MetaBool { .. }
1045 | Response::Declaration { .. }
1046 | Response::DeclarationBulk { .. }
1047 | Response::ProcessFile { .. }
1048 | Response::ProcessModule { .. }
1049 | Response::Terminating
1050 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1051 }
1052 }
1053
1054 pub(crate) fn open_worker_session(
1055 &mut self,
1056 config: &LeanWorkerSessionConfig,
1057 cancellation: Option<&LeanWorkerCancellationToken>,
1058 progress: Option<&dyn LeanWorkerProgressSink>,
1059 ) -> Result<(), LeanWorkerError> {
1060 const OPERATION: &str = "open_worker_session";
1061 check_cancelled(OPERATION, cancellation)?;
1062 self.prepare_request(true)?;
1063 self.send_request(Request::OpenHostSession {
1064 project_root: config.project_root_string(),
1065 package: config.package().to_owned(),
1066 lib_name: config.lib_name().to_owned(),
1067 imports: config.imports().to_vec(),
1068 })?;
1069 self.record_request(true);
1070 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1071 Response::HostSessionOpened => Ok(()),
1072 other @ (Response::HealthOk
1073 | Response::CapabilityLoaded
1074 | Response::U64 { .. }
1075 | Response::Elaboration { .. }
1076 | Response::KernelCheck { .. }
1077 | Response::Strings { .. }
1078 | Response::StreamComplete { .. }
1079 | Response::StreamExportFailed { .. }
1080 | Response::StreamCallbackFailed { .. }
1081 | Response::StreamRowMalformed { .. }
1082 | Response::CapabilityMetadata { .. }
1083 | Response::CapabilityDoctor { .. }
1084 | Response::CapabilityMetadataMalformed { .. }
1085 | Response::CapabilityDoctorMalformed { .. }
1086 | Response::JsonCommand { .. }
1087 | Response::MetaExpr { .. }
1088 | Response::MetaBool { .. }
1089 | Response::Declaration { .. }
1090 | Response::DeclarationBulk { .. }
1091 | Response::ProcessFile { .. }
1092 | Response::ProcessModule { .. }
1093 | Response::RowsComplete { .. }
1094 | Response::Terminating
1095 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1096 }
1097 }
1098
1099 #[expect(
1100 clippy::wildcard_enum_match_arm,
1101 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1102 )]
1103 pub(crate) fn worker_elaborate(
1104 &mut self,
1105 source: &str,
1106 options: &LeanWorkerElabOptions,
1107 cancellation: Option<&LeanWorkerCancellationToken>,
1108 progress: Option<&dyn LeanWorkerProgressSink>,
1109 ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
1110 self.round_trip(
1111 "worker_elaborate",
1112 Request::Elaborate {
1113 source: source.to_owned(),
1114 options: options.clone(),
1115 },
1116 false,
1117 cancellation,
1118 progress,
1119 |response, operation| match response {
1120 Response::Elaboration { outcome } => Ok(outcome),
1121 other => Err(unexpected_response(operation, &other)),
1122 },
1123 )
1124 }
1125
1126 #[expect(
1127 clippy::wildcard_enum_match_arm,
1128 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1129 )]
1130 pub(crate) fn worker_kernel_check(
1131 &mut self,
1132 source: &str,
1133 options: &LeanWorkerElabOptions,
1134 cancellation: Option<&LeanWorkerCancellationToken>,
1135 progress: Option<&dyn LeanWorkerProgressSink>,
1136 ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
1137 self.round_trip(
1138 "worker_kernel_check",
1139 Request::KernelCheck {
1140 source: source.to_owned(),
1141 options: options.clone(),
1142 progress: progress.is_some(),
1143 },
1144 false,
1145 cancellation,
1146 progress,
1147 |response, operation| match response {
1148 Response::KernelCheck { outcome } => Ok(outcome),
1149 other => Err(unexpected_response(operation, &other)),
1150 },
1151 )
1152 }
1153
1154 #[expect(
1155 clippy::wildcard_enum_match_arm,
1156 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1157 )]
1158 pub(crate) fn worker_declaration_kinds(
1159 &mut self,
1160 names: &[&str],
1161 cancellation: Option<&LeanWorkerCancellationToken>,
1162 progress: Option<&dyn LeanWorkerProgressSink>,
1163 ) -> Result<Vec<String>, LeanWorkerError> {
1164 self.round_trip(
1165 "worker_declaration_kinds",
1166 Request::DeclarationKinds {
1167 names: names.iter().map(|name| (*name).to_owned()).collect(),
1168 progress: progress.is_some(),
1169 },
1170 false,
1171 cancellation,
1172 progress,
1173 |response, operation| match response {
1174 Response::Strings { values } => Ok(values),
1175 other => Err(unexpected_response(operation, &other)),
1176 },
1177 )
1178 }
1179
1180 #[expect(
1181 clippy::wildcard_enum_match_arm,
1182 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1183 )]
1184 pub(crate) fn worker_declaration_names(
1185 &mut self,
1186 names: &[&str],
1187 cancellation: Option<&LeanWorkerCancellationToken>,
1188 progress: Option<&dyn LeanWorkerProgressSink>,
1189 ) -> Result<Vec<String>, LeanWorkerError> {
1190 self.round_trip(
1191 "worker_declaration_names",
1192 Request::DeclarationNames {
1193 names: names.iter().map(|name| (*name).to_owned()).collect(),
1194 progress: progress.is_some(),
1195 },
1196 false,
1197 cancellation,
1198 progress,
1199 |response, operation| match response {
1200 Response::Strings { values } => Ok(values),
1201 other => Err(unexpected_response(operation, &other)),
1202 },
1203 )
1204 }
1205
1206 #[expect(
1207 clippy::wildcard_enum_match_arm,
1208 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1209 )]
1210 pub(crate) fn worker_infer_type(
1211 &mut self,
1212 source: &str,
1213 options: &LeanWorkerElabOptions,
1214 cancellation: Option<&LeanWorkerCancellationToken>,
1215 progress: Option<&dyn LeanWorkerProgressSink>,
1216 ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
1217 self.round_trip(
1218 "worker_infer_type",
1219 Request::InferType {
1220 source: source.to_owned(),
1221 options: options.clone(),
1222 },
1223 false,
1224 cancellation,
1225 progress,
1226 |response, operation| match response {
1227 Response::MetaExpr { result } => Ok(result),
1228 other => Err(unexpected_response(operation, &other)),
1229 },
1230 )
1231 }
1232
1233 #[expect(
1234 clippy::wildcard_enum_match_arm,
1235 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1236 )]
1237 pub(crate) fn worker_whnf(
1238 &mut self,
1239 source: &str,
1240 options: &LeanWorkerElabOptions,
1241 cancellation: Option<&LeanWorkerCancellationToken>,
1242 progress: Option<&dyn LeanWorkerProgressSink>,
1243 ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
1244 self.round_trip(
1245 "worker_whnf",
1246 Request::Whnf {
1247 source: source.to_owned(),
1248 options: options.clone(),
1249 },
1250 false,
1251 cancellation,
1252 progress,
1253 |response, operation| match response {
1254 Response::MetaExpr { result } => Ok(result),
1255 other => Err(unexpected_response(operation, &other)),
1256 },
1257 )
1258 }
1259
1260 #[expect(
1261 clippy::wildcard_enum_match_arm,
1262 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1263 )]
1264 pub(crate) fn worker_is_def_eq(
1265 &mut self,
1266 lhs: &str,
1267 rhs: &str,
1268 transparency: LeanWorkerMetaTransparency,
1269 options: &LeanWorkerElabOptions,
1270 cancellation: Option<&LeanWorkerCancellationToken>,
1271 progress: Option<&dyn LeanWorkerProgressSink>,
1272 ) -> Result<LeanWorkerMetaResult<bool>, LeanWorkerError> {
1273 self.round_trip(
1274 "worker_is_def_eq",
1275 Request::IsDefEq {
1276 lhs: lhs.to_owned(),
1277 rhs: rhs.to_owned(),
1278 transparency,
1279 options: options.clone(),
1280 },
1281 false,
1282 cancellation,
1283 progress,
1284 |response, operation| match response {
1285 Response::MetaBool { result } => Ok(result),
1286 other => Err(unexpected_response(operation, &other)),
1287 },
1288 )
1289 }
1290
1291 #[expect(
1292 clippy::wildcard_enum_match_arm,
1293 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1294 )]
1295 pub(crate) fn worker_describe(
1296 &mut self,
1297 name: &str,
1298 cancellation: Option<&LeanWorkerCancellationToken>,
1299 progress: Option<&dyn LeanWorkerProgressSink>,
1300 ) -> Result<Option<LeanWorkerDeclarationRow>, LeanWorkerError> {
1301 self.round_trip(
1302 "worker_describe",
1303 Request::Describe { name: name.to_owned() },
1304 false,
1305 cancellation,
1306 progress,
1307 |response, operation| match response {
1308 Response::Declaration { row } => Ok(row),
1309 other => Err(unexpected_response(operation, &other)),
1310 },
1311 )
1312 }
1313
1314 #[allow(
1315 clippy::needless_pass_by_value,
1316 reason = "filter is cheap to clone, passed by value matches caller shape"
1317 )]
1318 pub(crate) fn worker_list_declarations_strings(
1319 &mut self,
1320 filter: LeanWorkerDeclarationFilter,
1321 cancellation: Option<&LeanWorkerCancellationToken>,
1322 progress: Option<&dyn LeanWorkerProgressSink>,
1323 ) -> Result<Vec<String>, LeanWorkerError> {
1324 const OPERATION: &str = "worker_list_declarations_strings";
1325 check_cancelled(OPERATION, cancellation)?;
1326 self.prepare_request(false)?;
1327 self.send_request(Request::ListDeclarationsStrings {
1328 filter,
1329 progress: progress.is_some(),
1330 })?;
1331 self.record_request(false);
1332 let collector = DeclarationNameCollector::default();
1333 let response = self.read_response_with_events(
1334 OPERATION,
1335 progress,
1336 cancellation,
1337 Some(LeanWorkerDataSinkTarget::Raw(&collector)),
1338 None,
1339 )?;
1340 if let Some(message) = collector.decode_error.lock().ok().and_then(|guard| guard.clone()) {
1341 return Err(LeanWorkerError::Protocol { message });
1342 }
1343 match response {
1344 Response::RowsComplete { count } => {
1345 let names = collector.into_inner();
1346 let observed = u64::try_from(names.len()).unwrap_or(u64::MAX);
1347 if observed != count {
1348 return Err(LeanWorkerError::Protocol {
1349 message: format!(
1350 "worker_list_declarations_strings: parent collected {observed} rows but child reported {count}"
1351 ),
1352 });
1353 }
1354 Ok(names)
1355 }
1356 other @ (Response::HealthOk
1357 | Response::CapabilityLoaded
1358 | Response::U64 { .. }
1359 | Response::HostSessionOpened
1360 | Response::Elaboration { .. }
1361 | Response::KernelCheck { .. }
1362 | Response::Strings { .. }
1363 | Response::StreamComplete { .. }
1364 | Response::StreamExportFailed { .. }
1365 | Response::StreamCallbackFailed { .. }
1366 | Response::StreamRowMalformed { .. }
1367 | Response::CapabilityMetadata { .. }
1368 | Response::CapabilityDoctor { .. }
1369 | Response::CapabilityMetadataMalformed { .. }
1370 | Response::CapabilityDoctorMalformed { .. }
1371 | Response::JsonCommand { .. }
1372 | Response::MetaExpr { .. }
1373 | Response::MetaBool { .. }
1374 | Response::Declaration { .. }
1375 | Response::DeclarationBulk { .. }
1376 | Response::ProcessFile { .. }
1377 | Response::ProcessModule { .. }
1378 | Response::Terminating
1379 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1380 }
1381 }
1382
1383 #[expect(
1384 clippy::wildcard_enum_match_arm,
1385 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1386 )]
1387 pub(crate) fn worker_describe_bulk(
1388 &mut self,
1389 names: &[&str],
1390 cancellation: Option<&LeanWorkerCancellationToken>,
1391 progress: Option<&dyn LeanWorkerProgressSink>,
1392 ) -> Result<Vec<LeanWorkerDeclarationRow>, LeanWorkerError> {
1393 self.round_trip(
1394 "worker_describe_bulk",
1395 Request::DescribeBulk {
1396 names: names.iter().map(|name| (*name).to_owned()).collect(),
1397 progress: progress.is_some(),
1398 },
1399 false,
1400 cancellation,
1401 progress,
1402 |response, operation| match response {
1403 Response::DeclarationBulk { rows } => Ok(rows),
1404 other => Err(unexpected_response(operation, &other)),
1405 },
1406 )
1407 }
1408
1409 #[expect(
1410 clippy::wildcard_enum_match_arm,
1411 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1412 )]
1413 pub(crate) fn worker_process_file(
1414 &mut self,
1415 source: &str,
1416 options: &LeanWorkerElabOptions,
1417 cancellation: Option<&LeanWorkerCancellationToken>,
1418 progress: Option<&dyn LeanWorkerProgressSink>,
1419 ) -> Result<LeanWorkerProcessFileOutcome, LeanWorkerError> {
1420 self.round_trip(
1421 "worker_process_file",
1422 Request::ProcessFile {
1423 source: source.to_owned(),
1424 options: options.clone(),
1425 },
1426 false,
1427 cancellation,
1428 progress,
1429 |response, operation| match response {
1430 Response::ProcessFile { outcome } => Ok(outcome),
1431 other => Err(unexpected_response(operation, &other)),
1432 },
1433 )
1434 }
1435
1436 #[expect(
1437 clippy::wildcard_enum_match_arm,
1438 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1439 )]
1440 pub(crate) fn worker_process_module(
1441 &mut self,
1442 source: &str,
1443 options: &LeanWorkerElabOptions,
1444 cancellation: Option<&LeanWorkerCancellationToken>,
1445 progress: Option<&dyn LeanWorkerProgressSink>,
1446 ) -> Result<LeanWorkerProcessModuleOutcome, LeanWorkerError> {
1447 self.round_trip(
1448 "worker_process_module",
1449 Request::ProcessModule {
1450 source: source.to_owned(),
1451 options: options.clone(),
1452 },
1453 false,
1454 cancellation,
1455 progress,
1456 |response, operation| match response {
1457 Response::ProcessModule { outcome } => Ok(outcome),
1458 other => Err(unexpected_response(operation, &other)),
1459 },
1460 )
1461 }
1462
1463 pub(crate) fn worker_run_data_stream(
1464 &mut self,
1465 export: &str,
1466 request: &serde_json::Value,
1467 rows: &dyn LeanWorkerDataSink,
1468 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1469 cancellation: Option<&LeanWorkerCancellationToken>,
1470 progress: Option<&dyn LeanWorkerProgressSink>,
1471 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1472 self.worker_run_data_stream_with_sink(
1473 export,
1474 request,
1475 LeanWorkerDataSinkTarget::Value(rows),
1476 diagnostics,
1477 cancellation,
1478 progress,
1479 )
1480 }
1481
1482 pub(crate) fn worker_run_data_stream_raw(
1483 &mut self,
1484 export: &str,
1485 request: &serde_json::Value,
1486 rows: &dyn LeanWorkerRawDataSink,
1487 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1488 cancellation: Option<&LeanWorkerCancellationToken>,
1489 progress: Option<&dyn LeanWorkerProgressSink>,
1490 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1491 self.worker_run_data_stream_with_sink(
1492 export,
1493 request,
1494 LeanWorkerDataSinkTarget::Raw(rows),
1495 diagnostics,
1496 cancellation,
1497 progress,
1498 )
1499 }
1500
1501 fn worker_run_data_stream_with_sink(
1502 &mut self,
1503 export: &str,
1504 request: &serde_json::Value,
1505 rows: LeanWorkerDataSinkTarget<'_>,
1506 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1507 cancellation: Option<&LeanWorkerCancellationToken>,
1508 progress: Option<&dyn LeanWorkerProgressSink>,
1509 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1510 const OPERATION: &str = "worker_run_data_stream";
1511 check_cancelled(OPERATION, cancellation)?;
1512 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1513 message: format!("worker data-stream request JSON encode failed: {err}"),
1514 })?;
1515 self.prepare_request(false)?;
1516 self.send_request(Request::RunDataStream {
1517 export: export.to_owned(),
1518 request_json,
1519 progress: progress.is_some(),
1520 })?;
1521 self.record_request(false);
1522 self.stats.stream_requests = self.stats.stream_requests.saturating_add(1);
1523 match self.read_response_with_events(OPERATION, progress, cancellation, Some(rows), diagnostics)? {
1524 Response::StreamComplete { summary } => Ok(summary.into()),
1525 Response::StreamExportFailed { status_byte } => {
1526 Err(LeanWorkerError::StreamExportFailed { status: status_byte })
1527 }
1528 Response::StreamCallbackFailed {
1529 status_byte,
1530 description,
1531 } => Err(LeanWorkerError::StreamCallbackFailed {
1532 status: status_byte,
1533 description,
1534 }),
1535 Response::StreamRowMalformed { message } => Err(LeanWorkerError::StreamRowMalformed { message }),
1536 other @ (Response::HealthOk
1537 | Response::CapabilityLoaded
1538 | Response::U64 { .. }
1539 | Response::HostSessionOpened
1540 | Response::Elaboration { .. }
1541 | Response::KernelCheck { .. }
1542 | Response::Strings { .. }
1543 | Response::RowsComplete { .. }
1544 | Response::CapabilityMetadata { .. }
1545 | Response::CapabilityDoctor { .. }
1546 | Response::CapabilityMetadataMalformed { .. }
1547 | Response::CapabilityDoctorMalformed { .. }
1548 | Response::JsonCommand { .. }
1549 | Response::MetaExpr { .. }
1550 | Response::MetaBool { .. }
1551 | Response::Declaration { .. }
1552 | Response::DeclarationBulk { .. }
1553 | Response::ProcessFile { .. }
1554 | Response::ProcessModule { .. }
1555 | Response::Terminating
1556 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1557 }
1558 }
1559
1560 #[expect(
1561 clippy::wildcard_enum_match_arm,
1562 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1563 )]
1564 pub(crate) fn worker_capability_metadata(
1565 &mut self,
1566 export: &str,
1567 request: &serde_json::Value,
1568 cancellation: Option<&LeanWorkerCancellationToken>,
1569 progress: Option<&dyn LeanWorkerProgressSink>,
1570 ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
1571 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1572 message: format!("worker capability metadata request JSON encode failed: {err}"),
1573 })?;
1574 self.round_trip(
1575 "worker_capability_metadata",
1576 Request::CapabilityMetadata {
1577 export: export.to_owned(),
1578 request_json,
1579 },
1580 false,
1581 cancellation,
1582 progress,
1583 |response, operation| match response {
1584 Response::CapabilityMetadata { metadata } => Ok(metadata),
1585 Response::CapabilityMetadataMalformed { message } => {
1586 Err(LeanWorkerError::CapabilityMetadataMalformed { message })
1587 }
1588 other => Err(unexpected_response(operation, &other)),
1589 },
1590 )
1591 }
1592
1593 #[expect(
1594 clippy::wildcard_enum_match_arm,
1595 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1596 )]
1597 pub(crate) fn worker_capability_doctor(
1598 &mut self,
1599 export: &str,
1600 request: &serde_json::Value,
1601 cancellation: Option<&LeanWorkerCancellationToken>,
1602 progress: Option<&dyn LeanWorkerProgressSink>,
1603 ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
1604 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1605 message: format!("worker capability doctor request JSON encode failed: {err}"),
1606 })?;
1607 self.round_trip(
1608 "worker_capability_doctor",
1609 Request::CapabilityDoctor {
1610 export: export.to_owned(),
1611 request_json,
1612 },
1613 false,
1614 cancellation,
1615 progress,
1616 |response, operation| match response {
1617 Response::CapabilityDoctor { report } => Ok(report),
1618 Response::CapabilityDoctorMalformed { message } => {
1619 Err(LeanWorkerError::CapabilityDoctorMalformed { message })
1620 }
1621 other => Err(unexpected_response(operation, &other)),
1622 },
1623 )
1624 }
1625
1626 #[expect(
1627 clippy::wildcard_enum_match_arm,
1628 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1629 )]
1630 pub(crate) fn worker_json_command(
1631 &mut self,
1632 export: &str,
1633 request_json: String,
1634 cancellation: Option<&LeanWorkerCancellationToken>,
1635 progress: Option<&dyn LeanWorkerProgressSink>,
1636 ) -> Result<String, LeanWorkerError> {
1637 self.round_trip(
1638 "worker_json_command",
1639 Request::JsonCommand {
1640 export: export.to_owned(),
1641 request_json,
1642 },
1643 false,
1644 cancellation,
1645 progress,
1646 |response, operation| match response {
1647 Response::JsonCommand { response_json } => Ok(response_json),
1648 other => Err(unexpected_response(operation, &other)),
1649 },
1650 )
1651 }
1652
1653 fn send_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
1654 self.ensure_running()?;
1655 let Some(stdin) = self.stdin.as_mut() else {
1656 return Err(self.dead_error());
1657 };
1658 write_frame(stdin, Message::Request(request)).map_err(|err| LeanWorkerError::Protocol {
1659 message: err.to_string(),
1660 })
1661 }
1662
1663 fn prepare_request(&mut self, import_like: bool) -> Result<(), LeanWorkerError> {
1664 self.ensure_running()?;
1665
1666 if let Some(limit) = self.config.restart_policy.max_requests
1667 && self.requests_since_restart >= limit
1668 {
1669 return self.restart_with_reason(LeanWorkerRestartReason::MaxRequests { limit });
1670 }
1671
1672 if import_like
1673 && let Some(limit) = self.config.restart_policy.max_imports
1674 && self.imports_since_restart >= limit
1675 {
1676 return self.restart_with_reason(LeanWorkerRestartReason::MaxImports { limit });
1677 }
1678
1679 if let Some(limit_kib) = self.config.restart_policy.max_rss_kib {
1680 match self.child_rss_kib() {
1681 Some(current_kib) if current_kib >= limit_kib => {
1682 self.stats.last_rss_kib = Some(current_kib);
1683 return self.restart_with_reason(LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib });
1684 }
1685 Some(current_kib) => {
1686 self.stats.last_rss_kib = Some(current_kib);
1687 }
1688 None => {
1689 self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
1690 }
1691 }
1692 }
1693
1694 if let Some(limit) = self.config.restart_policy.idle_restart_after {
1695 let idle_for = self.last_activity.elapsed();
1696 if idle_for >= limit {
1697 return self.restart_with_reason(LeanWorkerRestartReason::Idle { idle_for, limit });
1698 }
1699 }
1700
1701 Ok(())
1702 }
1703
1704 fn record_request(&mut self, import_like: bool) {
1705 self.stats.requests = self.stats.requests.saturating_add(1);
1706 self.requests_since_restart = self.requests_since_restart.saturating_add(1);
1707 if import_like {
1708 self.stats.imports = self.stats.imports.saturating_add(1);
1709 self.imports_since_restart = self.imports_since_restart.saturating_add(1);
1710 }
1711 self.last_activity = Instant::now();
1712 }
1713
1714 fn restart_with_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
1715 let config = self.config.clone();
1716 self.stop_existing_child()?;
1717 self.stats.record_restart(reason);
1718 self.requests_since_restart = 0;
1719 self.imports_since_restart = 0;
1720 let mut next = Self::spawn(&config)?;
1721 next.stats = self.stats.clone();
1722 next.last_activity = Instant::now();
1723 *self = next;
1724 Ok(())
1725 }
1726
1727 fn read_response(&mut self, operation: &'static str) -> Result<Response, LeanWorkerError> {
1728 self.read_response_with_events(operation, None, None, None, None)
1729 }
1730
1731 fn read_response_with_progress(
1732 &mut self,
1733 operation: &'static str,
1734 progress: Option<&dyn LeanWorkerProgressSink>,
1735 cancellation: Option<&LeanWorkerCancellationToken>,
1736 ) -> Result<Response, LeanWorkerError> {
1737 self.read_response_with_events(operation, progress, cancellation, None, None)
1738 }
1739
1740 fn round_trip<R>(
1751 &mut self,
1752 operation: &'static str,
1753 request: Request,
1754 import_like: bool,
1755 cancellation: Option<&LeanWorkerCancellationToken>,
1756 progress: Option<&dyn LeanWorkerProgressSink>,
1757 extract: impl FnOnce(Response, &'static str) -> Result<R, LeanWorkerError>,
1758 ) -> Result<R, LeanWorkerError> {
1759 check_cancelled(operation, cancellation)?;
1760 self.prepare_request(import_like)?;
1761 self.send_request(request)?;
1762 self.record_request(import_like);
1763 let response = self.read_response_with_progress(operation, progress, cancellation)?;
1764 extract(response, operation)
1765 }
1766
1767 fn read_response_with_events(
1768 &mut self,
1769 operation: &'static str,
1770 progress: Option<&dyn LeanWorkerProgressSink>,
1771 cancellation: Option<&LeanWorkerCancellationToken>,
1772 data: Option<LeanWorkerDataSinkTarget<'_>>,
1773 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1774 ) -> Result<Response, LeanWorkerError> {
1775 let started = Instant::now();
1776 let timeout = self.config.request_timeout;
1777 let deadline = started.checked_add(timeout);
1778 let streaming = data.is_some();
1779 let mut request_backpressure_waits = 0_u64;
1780 let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
1781 let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
1782 let _reader = thread::spawn(move || read_request_messages(stdout, sender));
1783
1784 loop {
1785 let event = match deadline.and_then(|deadline| deadline.checked_duration_since(Instant::now())) {
1786 Some(remaining) if remaining.is_zero() => {
1787 if streaming {
1788 self.record_stream_failure(started, request_backpressure_waits);
1789 }
1790 self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1791 operation,
1792 duration: timeout,
1793 })?;
1794 return Err(LeanWorkerError::Timeout {
1795 operation,
1796 duration: timeout,
1797 });
1798 }
1799 Some(remaining) => match receiver.recv_timeout(remaining) {
1800 Ok(event) => event,
1801 Err(mpsc::RecvTimeoutError::Timeout) => {
1802 if streaming {
1803 self.record_stream_failure(started, request_backpressure_waits);
1804 }
1805 self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1806 operation,
1807 duration: timeout,
1808 })?;
1809 return Err(LeanWorkerError::Timeout {
1810 operation,
1811 duration: timeout,
1812 });
1813 }
1814 Err(mpsc::RecvTimeoutError::Disconnected) => {
1815 return Err(LeanWorkerError::Protocol {
1816 message: "worker response reader exited without a terminal response".to_owned(),
1817 });
1818 }
1819 },
1820 None => match receiver.recv() {
1821 Ok(event) => event,
1822 Err(_err) => {
1823 return Err(LeanWorkerError::Protocol {
1824 message: "worker response reader exited without a terminal response".to_owned(),
1825 });
1826 }
1827 },
1828 };
1829 request_backpressure_waits = request_backpressure_waits.saturating_add(event.backpressure_waits());
1830 self.stats.backpressure_waits = self.stats.backpressure_waits.saturating_add(event.backpressure_waits());
1831
1832 let message = match event {
1833 RequestReaderEvent::Message { message, .. } => message,
1834 RequestReaderEvent::Terminal { message, stdout, .. } => {
1835 self.stdout = Some(stdout);
1836 match message {
1837 Message::Response(Response::Error { code, message }) => {
1838 if streaming {
1839 self.record_stream_failure(started, request_backpressure_waits);
1840 }
1841 return Err(LeanWorkerError::Worker { code, message });
1842 }
1843 Message::Response(response) => {
1844 if streaming {
1845 if matches!(response, Response::StreamComplete { .. }) {
1846 self.record_stream_success(started);
1847 } else {
1848 self.record_stream_failure(started, request_backpressure_waits);
1849 }
1850 }
1851 return Ok(response);
1852 }
1853 other @ (Message::Handshake { .. }
1854 | Message::Request(_)
1855 | Message::Diagnostic(_)
1856 | Message::ProgressTick(_)
1857 | Message::DataRow(_)
1858 | Message::FatalExit(_)) => {
1859 return Err(LeanWorkerError::Protocol {
1860 message: format!("worker sent unexpected {operation} message: {other:?}"),
1861 });
1862 }
1863 }
1864 }
1865 RequestReaderEvent::ReadError { message, eof, .. } => {
1866 if streaming {
1867 self.record_stream_failure(started, request_backpressure_waits);
1868 }
1869 return if eof {
1870 Err(self.record_exit_error())
1871 } else {
1872 Err(LeanWorkerError::Protocol { message })
1873 };
1874 }
1875 };
1876
1877 match message {
1878 Message::ProgressTick(tick) => {
1879 if let Err(err) =
1880 report_parent_progress(progress, elapsed_event(tick.phase, tick.current, tick.total, started))
1881 {
1882 if streaming {
1883 self.record_stream_failure(started, request_backpressure_waits);
1884 }
1885 return Err(err);
1886 }
1887 if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1888 if streaming {
1889 self.record_stream_failure(started, request_backpressure_waits);
1890 }
1891 self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1892 return Err(LeanWorkerError::Cancelled { operation });
1893 }
1894 }
1895 Message::DataRow(row) => {
1896 let payload_bytes = row.payload.get().len() as u64;
1897 if let Err(err) = report_parent_data_row(data, row) {
1898 if streaming {
1899 self.record_stream_failure(started, request_backpressure_waits);
1900 }
1901 return Err(err);
1902 }
1903 self.stats.data_rows_delivered = self.stats.data_rows_delivered.saturating_add(1);
1904 self.stats.data_row_payload_bytes = self.stats.data_row_payload_bytes.saturating_add(payload_bytes);
1905 if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1906 if streaming {
1907 self.record_stream_failure(started, request_backpressure_waits);
1908 }
1909 self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1910 return Err(LeanWorkerError::Cancelled { operation });
1911 }
1912 }
1913 Message::Diagnostic(diagnostic) => {
1914 if let Err(err) = report_parent_diagnostic(diagnostics, diagnostic.into()) {
1915 if streaming {
1916 self.record_stream_failure(started, request_backpressure_waits);
1917 }
1918 return Err(err);
1919 }
1920 }
1921 Message::Response(response) => return Err(unexpected_response(operation, &response)),
1922 other @ (Message::Handshake { .. } | Message::Request(_) | Message::FatalExit(_)) => {
1923 return Err(LeanWorkerError::Protocol {
1924 message: format!("worker sent unexpected {operation} message: {other:?}"),
1925 });
1926 }
1927 }
1928 }
1929 }
1930
1931 fn ensure_running(&mut self) -> Result<(), LeanWorkerError> {
1932 match self.status()? {
1933 LeanWorkerStatus::Running => Ok(()),
1934 LeanWorkerStatus::Exited(exit) if exit.success => Err(LeanWorkerError::ChildExited { exit }),
1935 LeanWorkerStatus::Exited(exit) => Err(LeanWorkerError::ChildPanicOrAbort { exit }),
1936 }
1937 }
1938
1939 fn record_stream_success(&mut self, started: Instant) {
1940 self.stats.stream_successes = self.stats.stream_successes.saturating_add(1);
1941 self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1942 }
1943
1944 fn record_stream_failure(&mut self, started: Instant, backpressure_waits: u64) {
1945 self.stats.stream_failures = self.stats.stream_failures.saturating_add(1);
1946 self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1947 if backpressure_waits > 0 {
1948 self.stats.backpressure_failures = self.stats.backpressure_failures.saturating_add(1);
1949 }
1950 }
1951
1952 fn wait_for_exit(&mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
1953 let Some(child) = self.child.as_mut() else {
1954 return Err(self.dead_error());
1955 };
1956 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1957 let diagnostics = self.read_stderr();
1958 let exit = LeanWorkerExit::from_status(status, diagnostics);
1959 self.last_exit = Some(exit.clone());
1960 self.child = None;
1961 self.stdin = None;
1962 self.stdout = None;
1963 self.stats.exits = self.stats.exits.saturating_add(1);
1964 Ok(exit)
1965 }
1966
1967 fn try_record_exit(&mut self) -> Option<LeanWorkerExit> {
1968 let child = self.child.as_mut()?;
1969 let status = child.try_wait().ok().flatten()?;
1970 let diagnostics = self.read_stderr();
1971 let exit = LeanWorkerExit::from_status(status, diagnostics);
1972 self.last_exit = Some(exit.clone());
1973 self.child = None;
1974 self.stdin = None;
1975 self.stdout = None;
1976 self.stats.exits = self.stats.exits.saturating_add(1);
1977 Some(exit)
1978 }
1979
1980 fn record_exit_error(&mut self) -> LeanWorkerError {
1981 match self.wait_for_exit() {
1982 Ok(exit) if exit.success => LeanWorkerError::ChildExited { exit },
1983 Ok(exit) => LeanWorkerError::ChildPanicOrAbort { exit },
1984 Err(err) => err,
1985 }
1986 }
1987
1988 fn stop_existing_child(&mut self) -> Result<(), LeanWorkerError> {
1989 if let Some(child) = self.child.as_mut() {
1990 drop(child.kill());
1991 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1992 let diagnostics = self.read_stderr();
1993 self.last_exit = Some(LeanWorkerExit::from_status(status, diagnostics));
1994 self.stats.exits = self.stats.exits.saturating_add(1);
1995 }
1996 self.child = None;
1997 self.stdin = None;
1998 self.stdout = None;
1999 Ok(())
2000 }
2001
2002 fn dead_error(&self) -> LeanWorkerError {
2003 let exit = self.last_exit.clone().unwrap_or_else(|| LeanWorkerExit {
2004 success: false,
2005 code: None,
2006 status: "worker is not running".to_owned(),
2007 diagnostics: String::new(),
2008 });
2009 if exit.success {
2010 LeanWorkerError::ChildExited { exit }
2011 } else {
2012 LeanWorkerError::ChildPanicOrAbort { exit }
2013 }
2014 }
2015
2016 fn read_stderr(&mut self) -> String {
2017 let mut diagnostics = String::new();
2018 if let Some(mut pipe) = self.stderr.take() {
2019 drop(pipe.read_to_string(&mut diagnostics));
2020 }
2021 diagnostics
2022 }
2023
2024 fn child_rss_kib(&mut self) -> Option<u64> {
2025 let child = self.child.as_mut()?;
2026 child_rss_kib(child.id())
2027 }
2028}
2029
2030enum RequestReaderEvent {
2031 Message {
2032 message: Message,
2033 backpressure_waits: u64,
2034 },
2035 Terminal {
2036 message: Message,
2037 stdout: BufReader<ChildStdout>,
2038 backpressure_waits: u64,
2039 },
2040 ReadError {
2041 message: String,
2042 eof: bool,
2043 backpressure_waits: u64,
2044 },
2045}
2046
2047impl RequestReaderEvent {
2048 fn backpressure_waits(&self) -> u64 {
2049 match self {
2050 Self::Message { backpressure_waits, .. }
2051 | Self::Terminal { backpressure_waits, .. }
2052 | Self::ReadError { backpressure_waits, .. } => *backpressure_waits,
2053 }
2054 }
2055
2056 fn add_backpressure_wait(&mut self) {
2057 match self {
2058 Self::Message { backpressure_waits, .. }
2059 | Self::Terminal { backpressure_waits, .. }
2060 | Self::ReadError { backpressure_waits, .. } => {
2061 *backpressure_waits = backpressure_waits.saturating_add(1);
2062 }
2063 }
2064 }
2065}
2066
2067#[allow(
2068 clippy::needless_pass_by_value,
2069 reason = "the request reader thread must own the sender"
2070)]
2071fn read_request_messages(mut stdout: BufReader<ChildStdout>, sender: mpsc::SyncSender<RequestReaderEvent>) {
2072 loop {
2073 match read_frame(&mut stdout) {
2074 Ok(frame) if matches!(frame.message, Message::Response(_)) => {
2075 let _ = send_reader_event(
2076 &sender,
2077 RequestReaderEvent::Terminal {
2078 message: frame.message,
2079 stdout,
2080 backpressure_waits: 0,
2081 },
2082 );
2083 return;
2084 }
2085 Ok(frame) => {
2086 if send_reader_event(
2087 &sender,
2088 RequestReaderEvent::Message {
2089 message: frame.message,
2090 backpressure_waits: 0,
2091 },
2092 )
2093 .is_err()
2094 {
2095 return;
2096 }
2097 }
2098 Err(err) => {
2099 let _ = send_reader_event(
2100 &sender,
2101 RequestReaderEvent::ReadError {
2102 message: err.to_string(),
2103 eof: err.is_eof(),
2104 backpressure_waits: 0,
2105 },
2106 );
2107 return;
2108 }
2109 }
2110 }
2111}
2112
2113fn send_reader_event(sender: &mpsc::SyncSender<RequestReaderEvent>, event: RequestReaderEvent) -> Result<(), ()> {
2114 match sender.try_send(event) {
2115 Ok(()) => Ok(()),
2116 Err(mpsc::TrySendError::Full(mut event)) => {
2117 event.add_backpressure_wait();
2118 sender.send(event).map_err(|_| ())
2119 }
2120 Err(mpsc::TrySendError::Disconnected(_event)) => Err(()),
2121 }
2122}
2123
2124impl Drop for LeanWorker {
2125 fn drop(&mut self) {
2126 if let Some(child) = self.child.as_mut() {
2127 drop(child.kill());
2128 drop(child.wait());
2129 }
2130 }
2131}
2132
2133fn expect_handshake(stdout: &mut BufReader<ChildStdout>) -> Result<LeanWorkerRuntimeMetadata, LeanWorkerError> {
2134 let frame = read_frame(stdout).map_err(|err| {
2135 if err.is_eof() {
2136 LeanWorkerError::Handshake {
2137 message: "child closed stdout before handshake".to_owned(),
2138 }
2139 } else {
2140 LeanWorkerError::Handshake {
2141 message: err.to_string(),
2142 }
2143 }
2144 })?;
2145 match frame.message {
2146 Message::Handshake {
2147 worker_version,
2148 protocol_version,
2149 } if protocol_version == crate::protocol::PROTOCOL_VERSION => Ok(LeanWorkerRuntimeMetadata {
2150 worker_version,
2151 protocol_version,
2152 lean_version: None,
2153 }),
2154 other @ (Message::Handshake { .. }
2155 | Message::Request(_)
2156 | Message::Response(_)
2157 | Message::Diagnostic(_)
2158 | Message::ProgressTick(_)
2159 | Message::DataRow(_)
2160 | Message::FatalExit(_)) => Err(LeanWorkerError::Handshake {
2161 message: format!("unexpected handshake frame: {other:?}"),
2162 }),
2163 }
2164}
2165
2166fn wait_with_stderr(child: &mut Child, stderr: Option<ChildStderr>) -> Result<LeanWorkerExit, LeanWorkerError> {
2167 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
2168 let mut diagnostics = String::new();
2169 if let Some(mut pipe) = stderr {
2170 drop(pipe.read_to_string(&mut diagnostics));
2171 }
2172 Ok(LeanWorkerExit::from_status(status, diagnostics))
2173}
2174
2175fn unexpected_response(operation: &'static str, response: &Response) -> LeanWorkerError {
2176 LeanWorkerError::Protocol {
2177 message: format!("worker sent unexpected {operation} response: {response:?}"),
2178 }
2179}
2180
2181fn path_string(path: &Path) -> String {
2182 path.to_string_lossy().into_owned()
2183}
2184
2185#[derive(Debug, Default)]
2193struct DeclarationNameCollector {
2194 names: Mutex<Vec<String>>,
2195 decode_error: Mutex<Option<String>>,
2196}
2197
2198impl DeclarationNameCollector {
2199 fn into_inner(self) -> Vec<String> {
2200 self.names.into_inner().unwrap_or_default()
2201 }
2202}
2203
2204impl LeanWorkerRawDataSink for DeclarationNameCollector {
2205 fn report(&self, row: LeanWorkerRawDataRow) {
2206 match serde_json::from_str::<String>(row.payload.get()) {
2207 Ok(name) => {
2208 if let Ok(mut guard) = self.names.lock() {
2209 guard.push(name);
2210 }
2211 }
2212 Err(err) => {
2213 if let Ok(mut slot) = self.decode_error.lock()
2214 && slot.is_none()
2215 {
2216 *slot = Some(format!("list_declarations_strings row payload decode failed: {err}"));
2217 }
2218 }
2219 }
2220 }
2221}
2222
2223#[cfg(target_os = "linux")]
2224fn child_rss_kib(pid: u32) -> Option<u64> {
2225 let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
2226 status.lines().find_map(|line| {
2227 let rest = line.strip_prefix("VmRSS:")?;
2228 rest.split_whitespace().next()?.parse::<u64>().ok()
2229 })
2230}
2231
2232#[cfg(not(target_os = "linux"))]
2233fn child_rss_kib(pid: u32) -> Option<u64> {
2234 let output = Command::new("ps")
2235 .args(["-o", "rss=", "-p", &pid.to_string()])
2236 .output()
2237 .ok()?;
2238 if !output.status.success() {
2239 return None;
2240 }
2241 let text = String::from_utf8_lossy(&output.stdout);
2242 text.trim().parse::<u64>().ok().filter(|value| *value > 0)
2243}