1use std::ffi::OsString;
2use std::fmt;
3use std::io::{BufReader, BufWriter, Read as _};
4use std::path::{Path, PathBuf};
5use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
6use std::sync::mpsc;
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::capability::LeanWorkerBootstrapDiagnosticCode;
11use crate::protocol::{Message, Request, Response, read_frame, write_frame};
12use crate::session::LeanWorkerDataSinkTarget;
13use crate::session::{
14 LeanWorkerCancellationToken, LeanWorkerCapabilityMetadata, LeanWorkerDataSink, LeanWorkerDiagnosticSink,
15 LeanWorkerDoctorReport, LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerKernelResult,
16 LeanWorkerProgressSink, LeanWorkerRawDataSink, LeanWorkerRuntimeMetadata, LeanWorkerSessionConfig,
17 LeanWorkerStreamSummary, check_cancelled, elapsed_event, report_parent_data_row, report_parent_diagnostic,
18 report_parent_progress,
19};
20
21const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
22const WORKER_EVENT_BUFFER_CAPACITY: usize = 64;
23
24pub const LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(30);
26
27pub const LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING: Duration = Duration::from_mins(10);
29
30#[derive(Clone, Debug)]
52pub struct LeanWorkerConfig {
53 executable: PathBuf,
54 current_dir: Option<PathBuf>,
55 env: Vec<(OsString, OsString)>,
56 startup_timeout: Duration,
57 request_timeout: Duration,
58 restart_policy: LeanWorkerRestartPolicy,
59}
60
61impl LeanWorkerConfig {
62 pub fn new(executable: impl Into<PathBuf>) -> Self {
64 Self {
65 executable: executable.into(),
66 current_dir: None,
67 env: Vec::new(),
68 startup_timeout: DEFAULT_STARTUP_TIMEOUT,
69 request_timeout: LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT,
70 restart_policy: LeanWorkerRestartPolicy::default(),
71 }
72 }
73
74 pub fn executable(&self) -> &Path {
76 &self.executable
77 }
78
79 #[must_use]
81 pub fn current_dir(mut self, path: impl Into<PathBuf>) -> Self {
82 self.current_dir = Some(path.into());
83 self
84 }
85
86 #[must_use]
88 pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
89 self.env.push((key.into(), value.into()));
90 self
91 }
92
93 #[must_use]
95 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
96 self.startup_timeout = timeout;
97 self
98 }
99
100 #[must_use]
106 pub fn request_timeout(mut self, timeout: Duration) -> Self {
107 self.request_timeout = timeout;
108 self
109 }
110
111 #[must_use]
113 pub fn long_running_requests(mut self) -> Self {
114 self.request_timeout = LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING;
115 self
116 }
117
118 #[must_use]
124 pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
125 self.restart_policy = policy;
126 self
127 }
128}
129
130#[derive(Clone, Debug, Default, Eq, PartialEq)]
137pub struct LeanWorkerRestartPolicy {
138 max_requests: Option<u64>,
139 max_imports: Option<u64>,
140 max_rss_kib: Option<u64>,
141 idle_restart_after: Option<Duration>,
142}
143
144impl LeanWorkerRestartPolicy {
145 #[must_use]
147 pub fn disabled() -> Self {
148 Self::default()
149 }
150
151 #[must_use]
153 pub fn max_requests(mut self, limit: u64) -> Self {
154 self.max_requests = Some(limit.max(1));
155 self
156 }
157
158 #[must_use]
160 pub fn max_imports(mut self, limit: u64) -> Self {
161 self.max_imports = Some(limit.max(1));
162 self
163 }
164
165 #[must_use]
171 pub fn max_rss_kib(mut self, limit: u64) -> Self {
172 self.max_rss_kib = Some(limit.max(1));
173 self
174 }
175
176 #[must_use]
178 pub fn idle_restart_after(mut self, duration: Duration) -> Self {
179 self.idle_restart_after = Some(duration);
180 self
181 }
182}
183
184#[derive(Clone, Debug, Eq, PartialEq)]
186pub enum LeanWorkerRestartReason {
187 Explicit,
189 MaxRequests { limit: u64 },
191 MaxImports { limit: u64 },
193 RssCeiling { current_kib: u64, limit_kib: u64 },
195 Idle { idle_for: Duration, limit: Duration },
197 Cancelled { operation: &'static str },
199 RequestTimeout {
201 operation: &'static str,
202 duration: Duration,
203 },
204}
205
206#[derive(Clone, Debug, Default, Eq, PartialEq)]
208pub struct LeanWorkerStats {
209 pub requests: u64,
211 pub imports: u64,
213 pub exits: u64,
215 pub restarts: u64,
217 pub explicit_cycles: u64,
219 pub max_request_restarts: u64,
221 pub max_import_restarts: u64,
223 pub rss_restarts: u64,
225 pub idle_restarts: u64,
227 pub cancelled_restarts: u64,
229 pub timeout_restarts: u64,
231 pub rss_samples_unavailable: u64,
233 pub last_rss_kib: Option<u64>,
235 pub last_restart_reason: Option<LeanWorkerRestartReason>,
237 pub stream_requests: u64,
239 pub stream_successes: u64,
241 pub stream_failures: u64,
243 pub data_rows_delivered: u64,
245 pub data_row_payload_bytes: u64,
247 pub stream_elapsed: Duration,
249 pub backpressure_waits: u64,
251 pub backpressure_failures: u64,
253}
254
255impl LeanWorkerStats {
256 fn record_restart(&mut self, reason: LeanWorkerRestartReason) {
257 self.restarts = self.restarts.saturating_add(1);
258 match &reason {
259 LeanWorkerRestartReason::Explicit => {
260 self.explicit_cycles = self.explicit_cycles.saturating_add(1);
261 }
262 LeanWorkerRestartReason::MaxRequests { .. } => {
263 self.max_request_restarts = self.max_request_restarts.saturating_add(1);
264 }
265 LeanWorkerRestartReason::MaxImports { .. } => {
266 self.max_import_restarts = self.max_import_restarts.saturating_add(1);
267 }
268 LeanWorkerRestartReason::RssCeiling { .. } => {
269 self.rss_restarts = self.rss_restarts.saturating_add(1);
270 }
271 LeanWorkerRestartReason::Idle { .. } => {
272 self.idle_restarts = self.idle_restarts.saturating_add(1);
273 }
274 LeanWorkerRestartReason::Cancelled { .. } => {
275 self.cancelled_restarts = self.cancelled_restarts.saturating_add(1);
276 }
277 LeanWorkerRestartReason::RequestTimeout { .. } => {
278 self.timeout_restarts = self.timeout_restarts.saturating_add(1);
279 }
280 }
281 self.last_restart_reason = Some(reason);
282 }
283}
284
285#[derive(Clone, Debug, Eq, PartialEq)]
287pub enum LeanWorkerStatus {
288 Running,
290 Exited(LeanWorkerExit),
292}
293
294#[derive(Clone, Debug, Eq, PartialEq)]
296pub struct LeanWorkerExit {
297 pub success: bool,
299 pub code: Option<i32>,
301 pub status: String,
303 pub diagnostics: String,
305}
306
307impl LeanWorkerExit {
308 fn from_status(status: ExitStatus, diagnostics: String) -> Self {
309 Self {
310 success: status.success(),
311 code: status.code(),
312 status: status.to_string(),
313 diagnostics,
314 }
315 }
316}
317
318#[derive(Debug)]
320pub enum LeanWorkerError {
321 Spawn {
323 executable: PathBuf,
324 source: std::io::Error,
325 },
326 WorkerChildUnresolved {
328 tried: Vec<PathBuf>,
330 },
331 WorkerChildNotExecutable { path: PathBuf, reason: String },
333 Bootstrap {
335 code: LeanWorkerBootstrapDiagnosticCode,
336 message: String,
337 },
338 CapabilityBuild {
340 diagnostic: lean_toolchain::LinkDiagnostics,
342 },
343 Setup { message: String },
345 Handshake { message: String },
347 Protocol { message: String },
349 Worker { code: String, message: String },
351 ChildExited { exit: LeanWorkerExit },
353 ChildPanicOrAbort { exit: LeanWorkerExit },
355 Timeout {
357 operation: &'static str,
358 duration: Duration,
359 },
360 Cancelled { operation: &'static str },
362 ProgressPanic { message: String },
364 DataSinkPanic { message: String },
366 DiagnosticSinkPanic { message: String },
368 StreamExportFailed { status: u8 },
370 StreamCallbackFailed { status: u8, description: String },
372 StreamRowMalformed { message: String },
374 CapabilityMetadataMalformed { message: String },
376 CapabilityMetadataMismatch {
378 export: String,
379 expected: Box<LeanWorkerCapabilityMetadata>,
380 actual: Box<LeanWorkerCapabilityMetadata>,
381 },
382 CapabilityDoctorMalformed { message: String },
384 TypedCommandRequestEncode { export: String, message: String },
386 TypedCommandResponseDecode { export: String, message: String },
388 TypedCommandRowDecode {
390 export: String,
391 stream: String,
392 sequence: u64,
393 message: String,
394 },
395 TypedCommandSummaryDecode { export: String, message: String },
397 LeaseInvalidated { reason: String },
399 WorkerPoolExhausted { max_workers: usize },
401 WorkerPoolMemoryBudgetExceeded { current_kib: u64, limit_kib: u64 },
403 WorkerPoolQueueTimeout { waited: Duration },
405 UnsupportedRequest { operation: &'static str },
407 Wait { source: std::io::Error },
409}
410
411impl fmt::Display for LeanWorkerError {
412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413 match self {
414 Self::Spawn { executable, source } => {
415 write!(f, "failed to spawn worker {}: {source}", executable.display())
416 }
417 Self::WorkerChildUnresolved { tried } => {
418 let tried = tried
419 .iter()
420 .map(|path| path.display().to_string())
421 .collect::<Vec<_>>()
422 .join(", ");
423 write!(
424 f,
425 "could not resolve lean-rs-worker-child; set LEAN_RS_WORKER_CHILD or place it beside the current executable (tried: {tried})"
426 )
427 }
428 Self::WorkerChildNotExecutable { path, reason } => {
429 write!(f, "worker child '{}' is not executable: {reason}", path.display())
430 }
431 Self::Bootstrap { code, message } => {
432 write!(f, "worker bootstrap check {code} failed: {message}")
433 }
434 Self::CapabilityBuild { diagnostic } => {
435 write!(f, "worker capability Lake target build failed: {diagnostic}")
436 }
437 Self::Setup { message } => write!(f, "worker child setup failed: {message}"),
438 Self::Handshake { message } => write!(f, "worker handshake failed: {message}"),
439 Self::Protocol { message } => write!(f, "worker protocol failed: {message}"),
440 Self::Worker { code, message } => write!(f, "worker returned {code}: {message}"),
441 Self::ChildExited { exit } => write!(f, "worker exited with {}", exit.status),
442 Self::ChildPanicOrAbort { exit } => {
443 write!(f, "worker exited fatally with {}", exit.status)
444 }
445 Self::Timeout { operation, duration } => {
446 write!(f, "worker operation {operation} timed out after {duration:?}")
447 }
448 Self::Cancelled { operation } => write!(f, "worker operation {operation} was cancelled"),
449 Self::ProgressPanic { message } => write!(f, "worker progress sink panicked: {message}"),
450 Self::DataSinkPanic { message } => write!(f, "worker data sink panicked: {message}"),
451 Self::DiagnosticSinkPanic { message } => {
452 write!(f, "worker diagnostic sink panicked: {message}")
453 }
454 Self::StreamExportFailed { status } => write!(f, "streaming export returned status {status}"),
455 Self::StreamCallbackFailed { status, description } => {
456 write!(f, "streaming callback failed with status {status}: {description}")
457 }
458 Self::StreamRowMalformed { message } => write!(f, "streaming export emitted malformed row: {message}"),
459 Self::CapabilityMetadataMalformed { message } => {
460 write!(f, "capability metadata export returned malformed JSON: {message}")
461 }
462 Self::CapabilityMetadataMismatch { export, .. } => {
463 write!(f, "capability metadata from {export} did not match expectation")
464 }
465 Self::CapabilityDoctorMalformed { message } => {
466 write!(f, "capability doctor export returned malformed JSON: {message}")
467 }
468 Self::TypedCommandRequestEncode { export, message } => {
469 write!(f, "typed worker command {export} request JSON encode failed: {message}")
470 }
471 Self::TypedCommandResponseDecode { export, message } => {
472 write!(
473 f,
474 "typed worker command {export} response JSON decode failed: {message}"
475 )
476 }
477 Self::TypedCommandRowDecode {
478 export,
479 stream,
480 sequence,
481 message,
482 } => {
483 write!(
484 f,
485 "typed worker command {export} row decode failed at stream {stream} sequence {sequence}: {message}"
486 )
487 }
488 Self::TypedCommandSummaryDecode { export, message } => {
489 write!(
490 f,
491 "typed worker command {export} terminal summary decode failed: {message}"
492 )
493 }
494 Self::LeaseInvalidated { reason } => write!(f, "worker pool lease was invalidated: {reason}"),
495 Self::WorkerPoolExhausted { max_workers } => {
496 write!(
497 f,
498 "worker pool cannot admit another session key; max_workers={max_workers}"
499 )
500 }
501 Self::WorkerPoolMemoryBudgetExceeded { current_kib, limit_kib } => {
502 write!(
503 f,
504 "worker pool cannot admit work within RSS budget; current_kib={current_kib} limit_kib={limit_kib}"
505 )
506 }
507 Self::WorkerPoolQueueTimeout { waited } => {
508 write!(f, "worker pool admission timed out after {waited:?}")
509 }
510 Self::UnsupportedRequest { operation } => {
511 write!(f, "worker operation {operation} is not supported")
512 }
513 Self::Wait { source } => write!(f, "failed to wait for worker child: {source}"),
514 }
515 }
516}
517
518impl std::error::Error for LeanWorkerError {
519 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
520 match self {
521 Self::Spawn { source, .. } | Self::Wait { source } => Some(source),
522 Self::CapabilityBuild { diagnostic } => Some(diagnostic),
523 Self::WorkerChildUnresolved { .. } | Self::WorkerChildNotExecutable { .. } | Self::Bootstrap { .. } => None,
524 Self::Setup { .. }
525 | Self::Handshake { .. }
526 | Self::Protocol { .. }
527 | Self::Worker { .. }
528 | Self::ChildExited { .. }
529 | Self::ChildPanicOrAbort { .. }
530 | Self::Timeout { .. }
531 | Self::Cancelled { .. }
532 | Self::ProgressPanic { .. }
533 | Self::DataSinkPanic { .. }
534 | Self::DiagnosticSinkPanic { .. }
535 | Self::StreamExportFailed { .. }
536 | Self::StreamCallbackFailed { .. }
537 | Self::StreamRowMalformed { .. }
538 | Self::CapabilityMetadataMalformed { .. }
539 | Self::CapabilityMetadataMismatch { .. }
540 | Self::CapabilityDoctorMalformed { .. }
541 | Self::TypedCommandRequestEncode { .. }
542 | Self::TypedCommandResponseDecode { .. }
543 | Self::TypedCommandRowDecode { .. }
544 | Self::TypedCommandSummaryDecode { .. }
545 | Self::LeaseInvalidated { .. }
546 | Self::WorkerPoolExhausted { .. }
547 | Self::WorkerPoolMemoryBudgetExceeded { .. }
548 | Self::WorkerPoolQueueTimeout { .. }
549 | Self::UnsupportedRequest { .. } => None,
550 }
551 }
552}
553
554#[derive(Debug)]
560pub struct LeanWorker {
561 config: LeanWorkerConfig,
562 child: Option<Child>,
563 stdin: Option<BufWriter<ChildStdin>>,
564 stdout: Option<BufReader<ChildStdout>>,
565 stderr: Option<ChildStderr>,
566 last_exit: Option<LeanWorkerExit>,
567 runtime_metadata: LeanWorkerRuntimeMetadata,
568 stats: LeanWorkerStats,
569 requests_since_restart: u64,
570 imports_since_restart: u64,
571 last_activity: Instant,
572}
573
574impl LeanWorker {
575 pub fn spawn(config: &LeanWorkerConfig) -> Result<Self, LeanWorkerError> {
583 let mut command = Command::new(&config.executable);
584 command
585 .stdin(Stdio::piped())
586 .stdout(Stdio::piped())
587 .stderr(Stdio::piped())
588 .env("LEAN_ABORT_ON_PANIC", "1")
589 .env("LEAN_BACKTRACE", "0")
590 .env("RUST_BACKTRACE", "0");
591
592 if let Some(current_dir) = &config.current_dir {
593 command.current_dir(current_dir);
594 }
595 for (key, value) in &config.env {
596 command.env(key, value);
597 }
598
599 let mut child = command.spawn().map_err(|source| LeanWorkerError::Spawn {
600 executable: config.executable.clone(),
601 source,
602 })?;
603
604 let stdin = child
605 .stdin
606 .take()
607 .map(BufWriter::new)
608 .ok_or_else(|| LeanWorkerError::Setup {
609 message: "child stdin unavailable".to_owned(),
610 })?;
611 let stdout = child.stdout.take().ok_or_else(|| LeanWorkerError::Setup {
612 message: "child stdout unavailable".to_owned(),
613 })?;
614 let stderr = child.stderr.take();
615
616 let (sender, receiver) = mpsc::channel();
617 let _handshake_reader = thread::spawn(move || {
618 let mut stdout = BufReader::new(stdout);
619 let result = expect_handshake(&mut stdout);
620 drop(sender.send((stdout, result)));
621 });
622
623 let (stdout, runtime_metadata) = match receiver.recv_timeout(config.startup_timeout) {
624 Ok((stdout, Ok(metadata))) => (stdout, metadata),
625 Ok((_stdout, Err(err))) => {
626 let mut worker = Self {
627 config: config.clone(),
628 child: Some(child),
629 stdin: Some(stdin),
630 stdout: None,
631 stderr,
632 last_exit: None,
633 runtime_metadata: LeanWorkerRuntimeMetadata {
634 worker_version: String::new(),
635 protocol_version: crate::protocol::PROTOCOL_VERSION,
636 lean_version: None,
637 },
638 stats: LeanWorkerStats::default(),
639 requests_since_restart: 0,
640 imports_since_restart: 0,
641 last_activity: Instant::now(),
642 };
643 let exit = worker.try_record_exit();
644 return Err(match exit {
645 Some(exit) if !exit.success => LeanWorkerError::ChildPanicOrAbort { exit },
646 Some(exit) => LeanWorkerError::ChildExited { exit },
647 None => err,
648 });
649 }
650 Err(mpsc::RecvTimeoutError::Timeout) => {
651 drop(child.kill());
652 let _exit = wait_with_stderr(&mut child, stderr)?;
653 return Err(LeanWorkerError::Timeout {
654 operation: "startup",
655 duration: config.startup_timeout,
656 });
657 }
658 Err(mpsc::RecvTimeoutError::Disconnected) => {
659 return Err(LeanWorkerError::Handshake {
660 message: "handshake reader exited without a result".to_owned(),
661 });
662 }
663 };
664
665 Ok(Self {
666 config: config.clone(),
667 child: Some(child),
668 stdin: Some(stdin),
669 stdout: Some(stdout),
670 stderr,
671 last_exit: None,
672 runtime_metadata,
673 stats: LeanWorkerStats::default(),
674 requests_since_restart: 0,
675 imports_since_restart: 0,
676 last_activity: Instant::now(),
677 })
678 }
679
680 pub fn health(&mut self) -> Result<(), LeanWorkerError> {
687 self.prepare_request(false)?;
688 self.send_request(Request::Health)?;
689 self.record_request(false);
690 match self.read_response("health")? {
691 Response::HealthOk => Ok(()),
692 other @ (Response::CapabilityLoaded
693 | Response::U64 { .. }
694 | Response::HostSessionOpened
695 | Response::Elaboration { .. }
696 | Response::KernelCheck { .. }
697 | Response::Strings { .. }
698 | Response::StreamComplete { .. }
699 | Response::StreamExportFailed { .. }
700 | Response::StreamCallbackFailed { .. }
701 | Response::StreamRowMalformed { .. }
702 | Response::CapabilityMetadata { .. }
703 | Response::CapabilityDoctor { .. }
704 | Response::CapabilityMetadataMalformed { .. }
705 | Response::CapabilityDoctorMalformed { .. }
706 | Response::JsonCommand { .. }
707 | Response::RowsComplete { .. }
708 | Response::Terminating
709 | Response::Error { .. }) => Err(unexpected_response("health", &other)),
710 }
711 }
712
713 pub fn load_fixture_capability(&mut self, fixture_root: impl AsRef<Path>) -> Result<(), LeanWorkerError> {
724 self.prepare_request(true)?;
725 self.send_request(Request::LoadFixtureCapability {
726 fixture_root: path_string(fixture_root.as_ref()),
727 })?;
728 self.record_request(true);
729 match self.read_response("load_fixture_capability")? {
730 Response::CapabilityLoaded => Ok(()),
731 other @ (Response::HealthOk
732 | Response::U64 { .. }
733 | Response::HostSessionOpened
734 | Response::Elaboration { .. }
735 | Response::KernelCheck { .. }
736 | Response::Strings { .. }
737 | Response::StreamComplete { .. }
738 | Response::StreamExportFailed { .. }
739 | Response::StreamCallbackFailed { .. }
740 | Response::StreamRowMalformed { .. }
741 | Response::CapabilityMetadata { .. }
742 | Response::CapabilityDoctor { .. }
743 | Response::CapabilityMetadataMalformed { .. }
744 | Response::CapabilityDoctorMalformed { .. }
745 | Response::JsonCommand { .. }
746 | Response::RowsComplete { .. }
747 | Response::Terminating
748 | Response::Error { .. }) => Err(unexpected_response("load_fixture_capability", &other)),
749 }
750 }
751
752 pub fn call_fixture_mul(
759 &mut self,
760 fixture_root: impl AsRef<Path>,
761 lhs: u64,
762 rhs: u64,
763 ) -> Result<u64, LeanWorkerError> {
764 self.prepare_request(true)?;
765 self.send_request(Request::CallFixtureMul {
766 fixture_root: path_string(fixture_root.as_ref()),
767 lhs,
768 rhs,
769 })?;
770 self.record_request(true);
771 match self.read_response("call_fixture_mul")? {
772 Response::U64 { value } => Ok(value),
773 other @ (Response::HealthOk
774 | Response::CapabilityLoaded
775 | Response::HostSessionOpened
776 | Response::Elaboration { .. }
777 | Response::KernelCheck { .. }
778 | Response::Strings { .. }
779 | Response::StreamComplete { .. }
780 | Response::StreamExportFailed { .. }
781 | Response::StreamCallbackFailed { .. }
782 | Response::StreamRowMalformed { .. }
783 | Response::CapabilityMetadata { .. }
784 | Response::CapabilityDoctor { .. }
785 | Response::CapabilityMetadataMalformed { .. }
786 | Response::CapabilityDoctorMalformed { .. }
787 | Response::JsonCommand { .. }
788 | Response::RowsComplete { .. }
789 | Response::Terminating
790 | Response::Error { .. }) => Err(unexpected_response("call_fixture_mul", &other)),
791 }
792 }
793
794 pub fn status(&mut self) -> Result<LeanWorkerStatus, LeanWorkerError> {
800 if let Some(exit) = &self.last_exit {
801 return Ok(LeanWorkerStatus::Exited(exit.clone()));
802 }
803 let Some(child) = self.child.as_mut() else {
804 return Ok(LeanWorkerStatus::Exited(LeanWorkerExit {
805 success: false,
806 code: None,
807 status: "worker is not running".to_owned(),
808 diagnostics: String::new(),
809 }));
810 };
811 match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
812 Some(status) => {
813 let diagnostics = self.read_stderr();
814 let exit = LeanWorkerExit::from_status(status, diagnostics);
815 self.last_exit = Some(exit.clone());
816 self.child = None;
817 self.stdin = None;
818 self.stdout = None;
819 self.stats.exits = self.stats.exits.saturating_add(1);
820 Ok(LeanWorkerStatus::Exited(exit))
821 }
822 None => Ok(LeanWorkerStatus::Running),
823 }
824 }
825
826 #[must_use]
828 pub fn stats(&self) -> LeanWorkerStats {
829 self.stats.clone()
830 }
831
832 #[must_use]
834 pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
835 self.runtime_metadata.clone()
836 }
837
838 pub fn rss_kib(&mut self) -> Option<u64> {
844 match self.child_rss_kib() {
845 Some(value) => {
846 self.stats.last_rss_kib = Some(value);
847 Some(value)
848 }
849 None => {
850 self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
851 None
852 }
853 }
854 }
855
856 #[must_use]
858 pub fn request_timeout(&self) -> Duration {
859 self.config.request_timeout
860 }
861
862 pub fn set_request_timeout(&mut self, timeout: Duration) {
867 self.config.request_timeout = timeout;
868 }
869
870 pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
881 self.restart_with_reason(LeanWorkerRestartReason::Explicit)
882 }
883
884 pub(crate) fn cycle_with_restart_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
885 self.restart_with_reason(reason)
886 }
887
888 pub fn restart(&mut self) -> Result<(), LeanWorkerError> {
899 self.cycle()
900 }
901
902 #[doc(hidden)]
903 pub fn __kill_for_test(&mut self) -> Result<(), LeanWorkerError> {
910 let Some(child) = self.child.as_mut() else {
911 return Err(self.dead_error());
912 };
913 child.kill().map_err(|source| LeanWorkerError::Wait { source })?;
914 Ok(())
915 }
916
917 pub fn terminate(mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
924 self.send_request(Request::Terminate)?;
925 match self.read_response("terminate")? {
926 Response::Terminating => self.wait_for_exit(),
927 other @ (Response::HealthOk
928 | Response::CapabilityLoaded
929 | Response::U64 { .. }
930 | Response::HostSessionOpened
931 | Response::Elaboration { .. }
932 | Response::KernelCheck { .. }
933 | Response::Strings { .. }
934 | Response::StreamComplete { .. }
935 | Response::StreamExportFailed { .. }
936 | Response::StreamCallbackFailed { .. }
937 | Response::StreamRowMalformed { .. }
938 | Response::CapabilityMetadata { .. }
939 | Response::CapabilityDoctor { .. }
940 | Response::CapabilityMetadataMalformed { .. }
941 | Response::CapabilityDoctorMalformed { .. }
942 | Response::JsonCommand { .. }
943 | Response::RowsComplete { .. }
944 | Response::Error { .. }) => Err(unexpected_response("terminate", &other)),
945 }
946 }
947
948 #[doc(hidden)]
949 pub fn __trigger_lean_panic_fixture(
956 mut self,
957 fixture_root: impl AsRef<Path>,
958 ) -> Result<LeanWorkerExit, LeanWorkerError> {
959 self.prepare_request(true)?;
960 self.send_request(Request::TriggerLeanPanic {
961 fixture_root: path_string(fixture_root.as_ref()),
962 })?;
963 self.record_request(true);
964 match self.read_response("trigger_lean_panic") {
965 Ok(response) => Err(unexpected_response("trigger_lean_panic", &response)),
966 Err(LeanWorkerError::ChildPanicOrAbort { exit }) => Ok(exit),
967 Err(err) => Err(err),
968 }
969 }
970
971 #[doc(hidden)]
972 pub fn __emit_test_rows(
979 &mut self,
980 streams: Vec<String>,
981 cancellation: Option<&LeanWorkerCancellationToken>,
982 data: Option<&dyn LeanWorkerDataSink>,
983 ) -> Result<u64, LeanWorkerError> {
984 const OPERATION: &str = "emit_test_rows";
985 check_cancelled(OPERATION, cancellation)?;
986 self.prepare_request(false)?;
987 self.send_request(Request::EmitTestRows { streams })?;
988 self.record_request(false);
989 match self.read_response_with_events(
990 OPERATION,
991 None,
992 cancellation,
993 data.map(LeanWorkerDataSinkTarget::Value),
994 None,
995 )? {
996 Response::RowsComplete { count } => Ok(count),
997 other @ (Response::HealthOk
998 | Response::CapabilityLoaded
999 | Response::U64 { .. }
1000 | Response::HostSessionOpened
1001 | Response::Elaboration { .. }
1002 | Response::KernelCheck { .. }
1003 | Response::Strings { .. }
1004 | Response::StreamComplete { .. }
1005 | Response::StreamExportFailed { .. }
1006 | Response::StreamCallbackFailed { .. }
1007 | Response::StreamRowMalformed { .. }
1008 | Response::CapabilityMetadata { .. }
1009 | Response::CapabilityDoctor { .. }
1010 | Response::CapabilityMetadataMalformed { .. }
1011 | Response::CapabilityDoctorMalformed { .. }
1012 | Response::JsonCommand { .. }
1013 | Response::Terminating
1014 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1015 }
1016 }
1017
1018 pub(crate) fn open_worker_session(
1019 &mut self,
1020 config: &LeanWorkerSessionConfig,
1021 cancellation: Option<&LeanWorkerCancellationToken>,
1022 progress: Option<&dyn LeanWorkerProgressSink>,
1023 ) -> Result<(), LeanWorkerError> {
1024 const OPERATION: &str = "open_worker_session";
1025 check_cancelled(OPERATION, cancellation)?;
1026 self.prepare_request(true)?;
1027 self.send_request(Request::OpenHostSession {
1028 project_root: config.project_root_string(),
1029 package: config.package().to_owned(),
1030 lib_name: config.lib_name().to_owned(),
1031 imports: config.imports().to_vec(),
1032 })?;
1033 self.record_request(true);
1034 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1035 Response::HostSessionOpened => Ok(()),
1036 other @ (Response::HealthOk
1037 | Response::CapabilityLoaded
1038 | Response::U64 { .. }
1039 | Response::Elaboration { .. }
1040 | Response::KernelCheck { .. }
1041 | Response::Strings { .. }
1042 | Response::StreamComplete { .. }
1043 | Response::StreamExportFailed { .. }
1044 | Response::StreamCallbackFailed { .. }
1045 | Response::StreamRowMalformed { .. }
1046 | Response::CapabilityMetadata { .. }
1047 | Response::CapabilityDoctor { .. }
1048 | Response::CapabilityMetadataMalformed { .. }
1049 | Response::CapabilityDoctorMalformed { .. }
1050 | Response::JsonCommand { .. }
1051 | Response::RowsComplete { .. }
1052 | Response::Terminating
1053 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1054 }
1055 }
1056
1057 pub(crate) fn worker_elaborate(
1058 &mut self,
1059 source: &str,
1060 options: &LeanWorkerElabOptions,
1061 cancellation: Option<&LeanWorkerCancellationToken>,
1062 progress: Option<&dyn LeanWorkerProgressSink>,
1063 ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
1064 const OPERATION: &str = "worker_elaborate";
1065 check_cancelled(OPERATION, cancellation)?;
1066 self.prepare_request(false)?;
1067 self.send_request(Request::Elaborate {
1068 source: source.to_owned(),
1069 options: options.wire(),
1070 })?;
1071 self.record_request(false);
1072 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1073 Response::Elaboration { outcome } => Ok(outcome.into()),
1074 other @ (Response::HealthOk
1075 | Response::CapabilityLoaded
1076 | Response::U64 { .. }
1077 | Response::HostSessionOpened
1078 | Response::KernelCheck { .. }
1079 | Response::Strings { .. }
1080 | Response::StreamComplete { .. }
1081 | Response::StreamExportFailed { .. }
1082 | Response::StreamCallbackFailed { .. }
1083 | Response::StreamRowMalformed { .. }
1084 | Response::CapabilityMetadata { .. }
1085 | Response::CapabilityDoctor { .. }
1086 | Response::CapabilityMetadataMalformed { .. }
1087 | Response::CapabilityDoctorMalformed { .. }
1088 | Response::JsonCommand { .. }
1089 | Response::RowsComplete { .. }
1090 | Response::Terminating
1091 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1092 }
1093 }
1094
1095 pub(crate) fn worker_kernel_check(
1096 &mut self,
1097 source: &str,
1098 options: &LeanWorkerElabOptions,
1099 cancellation: Option<&LeanWorkerCancellationToken>,
1100 progress: Option<&dyn LeanWorkerProgressSink>,
1101 ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
1102 const OPERATION: &str = "worker_kernel_check";
1103 check_cancelled(OPERATION, cancellation)?;
1104 self.prepare_request(false)?;
1105 self.send_request(Request::KernelCheck {
1106 source: source.to_owned(),
1107 options: options.wire(),
1108 progress: progress.is_some(),
1109 })?;
1110 self.record_request(false);
1111 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1112 Response::KernelCheck { outcome } => Ok(outcome.into()),
1113 other @ (Response::HealthOk
1114 | Response::CapabilityLoaded
1115 | Response::U64 { .. }
1116 | Response::HostSessionOpened
1117 | Response::Elaboration { .. }
1118 | Response::Strings { .. }
1119 | Response::StreamComplete { .. }
1120 | Response::StreamExportFailed { .. }
1121 | Response::StreamCallbackFailed { .. }
1122 | Response::StreamRowMalformed { .. }
1123 | Response::CapabilityMetadata { .. }
1124 | Response::CapabilityDoctor { .. }
1125 | Response::CapabilityMetadataMalformed { .. }
1126 | Response::CapabilityDoctorMalformed { .. }
1127 | Response::JsonCommand { .. }
1128 | Response::RowsComplete { .. }
1129 | Response::Terminating
1130 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1131 }
1132 }
1133
1134 pub(crate) fn worker_declaration_kinds(
1135 &mut self,
1136 names: &[&str],
1137 cancellation: Option<&LeanWorkerCancellationToken>,
1138 progress: Option<&dyn LeanWorkerProgressSink>,
1139 ) -> Result<Vec<String>, LeanWorkerError> {
1140 const OPERATION: &str = "worker_declaration_kinds";
1141 check_cancelled(OPERATION, cancellation)?;
1142 self.prepare_request(false)?;
1143 self.send_request(Request::DeclarationKinds {
1144 names: names.iter().map(|name| (*name).to_owned()).collect(),
1145 progress: progress.is_some(),
1146 })?;
1147 self.record_request(false);
1148 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1149 Response::Strings { values } => Ok(values),
1150 other @ (Response::HealthOk
1151 | Response::CapabilityLoaded
1152 | Response::U64 { .. }
1153 | Response::HostSessionOpened
1154 | Response::Elaboration { .. }
1155 | Response::KernelCheck { .. }
1156 | Response::StreamComplete { .. }
1157 | Response::StreamExportFailed { .. }
1158 | Response::StreamCallbackFailed { .. }
1159 | Response::StreamRowMalformed { .. }
1160 | Response::CapabilityMetadata { .. }
1161 | Response::CapabilityDoctor { .. }
1162 | Response::CapabilityMetadataMalformed { .. }
1163 | Response::CapabilityDoctorMalformed { .. }
1164 | Response::JsonCommand { .. }
1165 | Response::RowsComplete { .. }
1166 | Response::Terminating
1167 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1168 }
1169 }
1170
1171 pub(crate) fn worker_declaration_names(
1172 &mut self,
1173 names: &[&str],
1174 cancellation: Option<&LeanWorkerCancellationToken>,
1175 progress: Option<&dyn LeanWorkerProgressSink>,
1176 ) -> Result<Vec<String>, LeanWorkerError> {
1177 const OPERATION: &str = "worker_declaration_names";
1178 check_cancelled(OPERATION, cancellation)?;
1179 self.prepare_request(false)?;
1180 self.send_request(Request::DeclarationNames {
1181 names: names.iter().map(|name| (*name).to_owned()).collect(),
1182 progress: progress.is_some(),
1183 })?;
1184 self.record_request(false);
1185 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1186 Response::Strings { values } => Ok(values),
1187 other @ (Response::HealthOk
1188 | Response::CapabilityLoaded
1189 | Response::U64 { .. }
1190 | Response::HostSessionOpened
1191 | Response::Elaboration { .. }
1192 | Response::KernelCheck { .. }
1193 | Response::StreamComplete { .. }
1194 | Response::StreamExportFailed { .. }
1195 | Response::StreamCallbackFailed { .. }
1196 | Response::StreamRowMalformed { .. }
1197 | Response::CapabilityMetadata { .. }
1198 | Response::CapabilityDoctor { .. }
1199 | Response::CapabilityMetadataMalformed { .. }
1200 | Response::CapabilityDoctorMalformed { .. }
1201 | Response::JsonCommand { .. }
1202 | Response::RowsComplete { .. }
1203 | Response::Terminating
1204 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1205 }
1206 }
1207
1208 pub(crate) fn worker_run_data_stream(
1209 &mut self,
1210 export: &str,
1211 request: &serde_json::Value,
1212 rows: &dyn LeanWorkerDataSink,
1213 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1214 cancellation: Option<&LeanWorkerCancellationToken>,
1215 progress: Option<&dyn LeanWorkerProgressSink>,
1216 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1217 self.worker_run_data_stream_with_sink(
1218 export,
1219 request,
1220 LeanWorkerDataSinkTarget::Value(rows),
1221 diagnostics,
1222 cancellation,
1223 progress,
1224 )
1225 }
1226
1227 pub(crate) fn worker_run_data_stream_raw(
1228 &mut self,
1229 export: &str,
1230 request: &serde_json::Value,
1231 rows: &dyn LeanWorkerRawDataSink,
1232 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1233 cancellation: Option<&LeanWorkerCancellationToken>,
1234 progress: Option<&dyn LeanWorkerProgressSink>,
1235 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1236 self.worker_run_data_stream_with_sink(
1237 export,
1238 request,
1239 LeanWorkerDataSinkTarget::Raw(rows),
1240 diagnostics,
1241 cancellation,
1242 progress,
1243 )
1244 }
1245
1246 fn worker_run_data_stream_with_sink(
1247 &mut self,
1248 export: &str,
1249 request: &serde_json::Value,
1250 rows: LeanWorkerDataSinkTarget<'_>,
1251 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1252 cancellation: Option<&LeanWorkerCancellationToken>,
1253 progress: Option<&dyn LeanWorkerProgressSink>,
1254 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1255 const OPERATION: &str = "worker_run_data_stream";
1256 check_cancelled(OPERATION, cancellation)?;
1257 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1258 message: format!("worker data-stream request JSON encode failed: {err}"),
1259 })?;
1260 self.prepare_request(false)?;
1261 self.send_request(Request::RunDataStream {
1262 export: export.to_owned(),
1263 request_json,
1264 progress: progress.is_some(),
1265 })?;
1266 self.record_request(false);
1267 self.stats.stream_requests = self.stats.stream_requests.saturating_add(1);
1268 match self.read_response_with_events(OPERATION, progress, cancellation, Some(rows), diagnostics)? {
1269 Response::StreamComplete { summary } => Ok(summary.into()),
1270 Response::StreamExportFailed { status_byte } => {
1271 Err(LeanWorkerError::StreamExportFailed { status: status_byte })
1272 }
1273 Response::StreamCallbackFailed {
1274 status_byte,
1275 description,
1276 } => Err(LeanWorkerError::StreamCallbackFailed {
1277 status: status_byte,
1278 description,
1279 }),
1280 Response::StreamRowMalformed { message } => Err(LeanWorkerError::StreamRowMalformed { message }),
1281 other @ (Response::HealthOk
1282 | Response::CapabilityLoaded
1283 | Response::U64 { .. }
1284 | Response::HostSessionOpened
1285 | Response::Elaboration { .. }
1286 | Response::KernelCheck { .. }
1287 | Response::Strings { .. }
1288 | Response::RowsComplete { .. }
1289 | Response::CapabilityMetadata { .. }
1290 | Response::CapabilityDoctor { .. }
1291 | Response::CapabilityMetadataMalformed { .. }
1292 | Response::CapabilityDoctorMalformed { .. }
1293 | Response::JsonCommand { .. }
1294 | Response::Terminating
1295 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1296 }
1297 }
1298
1299 pub(crate) fn worker_capability_metadata(
1300 &mut self,
1301 export: &str,
1302 request: &serde_json::Value,
1303 cancellation: Option<&LeanWorkerCancellationToken>,
1304 progress: Option<&dyn LeanWorkerProgressSink>,
1305 ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
1306 const OPERATION: &str = "worker_capability_metadata";
1307 check_cancelled(OPERATION, cancellation)?;
1308 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1309 message: format!("worker capability metadata request JSON encode failed: {err}"),
1310 })?;
1311 self.prepare_request(false)?;
1312 self.send_request(Request::CapabilityMetadata {
1313 export: export.to_owned(),
1314 request_json,
1315 })?;
1316 self.record_request(false);
1317 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1318 Response::CapabilityMetadata { metadata } => Ok(metadata.into()),
1319 Response::CapabilityMetadataMalformed { message } => {
1320 Err(LeanWorkerError::CapabilityMetadataMalformed { message })
1321 }
1322 other @ (Response::HealthOk
1323 | Response::CapabilityLoaded
1324 | Response::U64 { .. }
1325 | Response::HostSessionOpened
1326 | Response::Elaboration { .. }
1327 | Response::KernelCheck { .. }
1328 | Response::Strings { .. }
1329 | Response::StreamComplete { .. }
1330 | Response::StreamExportFailed { .. }
1331 | Response::StreamCallbackFailed { .. }
1332 | Response::StreamRowMalformed { .. }
1333 | Response::CapabilityDoctor { .. }
1334 | Response::CapabilityDoctorMalformed { .. }
1335 | Response::JsonCommand { .. }
1336 | Response::RowsComplete { .. }
1337 | Response::Terminating
1338 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1339 }
1340 }
1341
1342 pub(crate) fn worker_capability_doctor(
1343 &mut self,
1344 export: &str,
1345 request: &serde_json::Value,
1346 cancellation: Option<&LeanWorkerCancellationToken>,
1347 progress: Option<&dyn LeanWorkerProgressSink>,
1348 ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
1349 const OPERATION: &str = "worker_capability_doctor";
1350 check_cancelled(OPERATION, cancellation)?;
1351 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1352 message: format!("worker capability doctor request JSON encode failed: {err}"),
1353 })?;
1354 self.prepare_request(false)?;
1355 self.send_request(Request::CapabilityDoctor {
1356 export: export.to_owned(),
1357 request_json,
1358 })?;
1359 self.record_request(false);
1360 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1361 Response::CapabilityDoctor { report } => Ok(report.into()),
1362 Response::CapabilityDoctorMalformed { message } => {
1363 Err(LeanWorkerError::CapabilityDoctorMalformed { message })
1364 }
1365 other @ (Response::HealthOk
1366 | Response::CapabilityLoaded
1367 | Response::U64 { .. }
1368 | Response::HostSessionOpened
1369 | Response::Elaboration { .. }
1370 | Response::KernelCheck { .. }
1371 | Response::Strings { .. }
1372 | Response::StreamComplete { .. }
1373 | Response::StreamExportFailed { .. }
1374 | Response::StreamCallbackFailed { .. }
1375 | Response::StreamRowMalformed { .. }
1376 | Response::CapabilityMetadata { .. }
1377 | Response::CapabilityMetadataMalformed { .. }
1378 | Response::JsonCommand { .. }
1379 | Response::RowsComplete { .. }
1380 | Response::Terminating
1381 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1382 }
1383 }
1384
1385 pub(crate) fn worker_json_command(
1386 &mut self,
1387 export: &str,
1388 request_json: String,
1389 cancellation: Option<&LeanWorkerCancellationToken>,
1390 progress: Option<&dyn LeanWorkerProgressSink>,
1391 ) -> Result<String, LeanWorkerError> {
1392 const OPERATION: &str = "worker_json_command";
1393 check_cancelled(OPERATION, cancellation)?;
1394 self.prepare_request(false)?;
1395 self.send_request(Request::JsonCommand {
1396 export: export.to_owned(),
1397 request_json,
1398 })?;
1399 self.record_request(false);
1400 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1401 Response::JsonCommand { response_json } => Ok(response_json),
1402 other @ (Response::HealthOk
1403 | Response::CapabilityLoaded
1404 | Response::U64 { .. }
1405 | Response::HostSessionOpened
1406 | Response::Elaboration { .. }
1407 | Response::KernelCheck { .. }
1408 | Response::Strings { .. }
1409 | Response::StreamComplete { .. }
1410 | Response::StreamExportFailed { .. }
1411 | Response::StreamCallbackFailed { .. }
1412 | Response::StreamRowMalformed { .. }
1413 | Response::CapabilityMetadata { .. }
1414 | Response::CapabilityDoctor { .. }
1415 | Response::CapabilityMetadataMalformed { .. }
1416 | Response::CapabilityDoctorMalformed { .. }
1417 | Response::RowsComplete { .. }
1418 | Response::Terminating
1419 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1420 }
1421 }
1422
1423 fn send_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
1424 self.ensure_running()?;
1425 let Some(stdin) = self.stdin.as_mut() else {
1426 return Err(self.dead_error());
1427 };
1428 write_frame(stdin, Message::Request(request)).map_err(|err| LeanWorkerError::Protocol {
1429 message: err.to_string(),
1430 })
1431 }
1432
1433 fn prepare_request(&mut self, import_like: bool) -> Result<(), LeanWorkerError> {
1434 self.ensure_running()?;
1435
1436 if let Some(limit) = self.config.restart_policy.max_requests
1437 && self.requests_since_restart >= limit
1438 {
1439 return self.restart_with_reason(LeanWorkerRestartReason::MaxRequests { limit });
1440 }
1441
1442 if import_like
1443 && let Some(limit) = self.config.restart_policy.max_imports
1444 && self.imports_since_restart >= limit
1445 {
1446 return self.restart_with_reason(LeanWorkerRestartReason::MaxImports { limit });
1447 }
1448
1449 if let Some(limit_kib) = self.config.restart_policy.max_rss_kib {
1450 match self.child_rss_kib() {
1451 Some(current_kib) if current_kib >= limit_kib => {
1452 self.stats.last_rss_kib = Some(current_kib);
1453 return self.restart_with_reason(LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib });
1454 }
1455 Some(current_kib) => {
1456 self.stats.last_rss_kib = Some(current_kib);
1457 }
1458 None => {
1459 self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
1460 }
1461 }
1462 }
1463
1464 if let Some(limit) = self.config.restart_policy.idle_restart_after {
1465 let idle_for = self.last_activity.elapsed();
1466 if idle_for >= limit {
1467 return self.restart_with_reason(LeanWorkerRestartReason::Idle { idle_for, limit });
1468 }
1469 }
1470
1471 Ok(())
1472 }
1473
1474 fn record_request(&mut self, import_like: bool) {
1475 self.stats.requests = self.stats.requests.saturating_add(1);
1476 self.requests_since_restart = self.requests_since_restart.saturating_add(1);
1477 if import_like {
1478 self.stats.imports = self.stats.imports.saturating_add(1);
1479 self.imports_since_restart = self.imports_since_restart.saturating_add(1);
1480 }
1481 self.last_activity = Instant::now();
1482 }
1483
1484 fn restart_with_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
1485 let config = self.config.clone();
1486 self.stop_existing_child()?;
1487 self.stats.record_restart(reason);
1488 self.requests_since_restart = 0;
1489 self.imports_since_restart = 0;
1490 let mut next = Self::spawn(&config)?;
1491 next.stats = self.stats.clone();
1492 next.last_activity = Instant::now();
1493 *self = next;
1494 Ok(())
1495 }
1496
1497 fn read_response(&mut self, operation: &'static str) -> Result<Response, LeanWorkerError> {
1498 self.read_response_with_events(operation, None, None, None, None)
1499 }
1500
1501 fn read_response_with_progress(
1502 &mut self,
1503 operation: &'static str,
1504 progress: Option<&dyn LeanWorkerProgressSink>,
1505 cancellation: Option<&LeanWorkerCancellationToken>,
1506 ) -> Result<Response, LeanWorkerError> {
1507 self.read_response_with_events(operation, progress, cancellation, None, None)
1508 }
1509
1510 fn read_response_with_events(
1511 &mut self,
1512 operation: &'static str,
1513 progress: Option<&dyn LeanWorkerProgressSink>,
1514 cancellation: Option<&LeanWorkerCancellationToken>,
1515 data: Option<LeanWorkerDataSinkTarget<'_>>,
1516 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1517 ) -> Result<Response, LeanWorkerError> {
1518 let started = Instant::now();
1519 let timeout = self.config.request_timeout;
1520 let deadline = started.checked_add(timeout);
1521 let streaming = data.is_some();
1522 let mut request_backpressure_waits = 0_u64;
1523 let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
1524 let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
1525 let _reader = thread::spawn(move || read_request_messages(stdout, sender));
1526
1527 loop {
1528 let event = match deadline.and_then(|deadline| deadline.checked_duration_since(Instant::now())) {
1529 Some(remaining) if remaining.is_zero() => {
1530 if streaming {
1531 self.record_stream_failure(started, request_backpressure_waits);
1532 }
1533 self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1534 operation,
1535 duration: timeout,
1536 })?;
1537 return Err(LeanWorkerError::Timeout {
1538 operation,
1539 duration: timeout,
1540 });
1541 }
1542 Some(remaining) => match receiver.recv_timeout(remaining) {
1543 Ok(event) => event,
1544 Err(mpsc::RecvTimeoutError::Timeout) => {
1545 if streaming {
1546 self.record_stream_failure(started, request_backpressure_waits);
1547 }
1548 self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1549 operation,
1550 duration: timeout,
1551 })?;
1552 return Err(LeanWorkerError::Timeout {
1553 operation,
1554 duration: timeout,
1555 });
1556 }
1557 Err(mpsc::RecvTimeoutError::Disconnected) => {
1558 return Err(LeanWorkerError::Protocol {
1559 message: "worker response reader exited without a terminal response".to_owned(),
1560 });
1561 }
1562 },
1563 None => match receiver.recv() {
1564 Ok(event) => event,
1565 Err(_err) => {
1566 return Err(LeanWorkerError::Protocol {
1567 message: "worker response reader exited without a terminal response".to_owned(),
1568 });
1569 }
1570 },
1571 };
1572 request_backpressure_waits = request_backpressure_waits.saturating_add(event.backpressure_waits());
1573 self.stats.backpressure_waits = self.stats.backpressure_waits.saturating_add(event.backpressure_waits());
1574
1575 let message = match event {
1576 RequestReaderEvent::Message { message, .. } => message,
1577 RequestReaderEvent::Terminal { message, stdout, .. } => {
1578 self.stdout = Some(stdout);
1579 match message {
1580 Message::Response(Response::Error { code, message }) => {
1581 if streaming {
1582 self.record_stream_failure(started, request_backpressure_waits);
1583 }
1584 return Err(LeanWorkerError::Worker { code, message });
1585 }
1586 Message::Response(response) => {
1587 if streaming {
1588 if matches!(response, Response::StreamComplete { .. }) {
1589 self.record_stream_success(started);
1590 } else {
1591 self.record_stream_failure(started, request_backpressure_waits);
1592 }
1593 }
1594 return Ok(response);
1595 }
1596 other @ (Message::Handshake { .. }
1597 | Message::Request(_)
1598 | Message::Diagnostic(_)
1599 | Message::ProgressTick(_)
1600 | Message::DataRow(_)
1601 | Message::FatalExit(_)) => {
1602 return Err(LeanWorkerError::Protocol {
1603 message: format!("worker sent unexpected {operation} message: {other:?}"),
1604 });
1605 }
1606 }
1607 }
1608 RequestReaderEvent::ReadError { message, eof, .. } => {
1609 if streaming {
1610 self.record_stream_failure(started, request_backpressure_waits);
1611 }
1612 return if eof {
1613 Err(self.record_exit_error())
1614 } else {
1615 Err(LeanWorkerError::Protocol { message })
1616 };
1617 }
1618 };
1619
1620 match message {
1621 Message::ProgressTick(tick) => {
1622 if let Err(err) =
1623 report_parent_progress(progress, elapsed_event(tick.phase, tick.current, tick.total, started))
1624 {
1625 if streaming {
1626 self.record_stream_failure(started, request_backpressure_waits);
1627 }
1628 return Err(err);
1629 }
1630 if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1631 if streaming {
1632 self.record_stream_failure(started, request_backpressure_waits);
1633 }
1634 self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1635 return Err(LeanWorkerError::Cancelled { operation });
1636 }
1637 }
1638 Message::DataRow(row) => {
1639 let payload_bytes = row.payload.get().len() as u64;
1640 if let Err(err) = report_parent_data_row(data, row) {
1641 if streaming {
1642 self.record_stream_failure(started, request_backpressure_waits);
1643 }
1644 return Err(err);
1645 }
1646 self.stats.data_rows_delivered = self.stats.data_rows_delivered.saturating_add(1);
1647 self.stats.data_row_payload_bytes = self.stats.data_row_payload_bytes.saturating_add(payload_bytes);
1648 if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1649 if streaming {
1650 self.record_stream_failure(started, request_backpressure_waits);
1651 }
1652 self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1653 return Err(LeanWorkerError::Cancelled { operation });
1654 }
1655 }
1656 Message::Diagnostic(diagnostic) => {
1657 if let Err(err) = report_parent_diagnostic(diagnostics, diagnostic.into()) {
1658 if streaming {
1659 self.record_stream_failure(started, request_backpressure_waits);
1660 }
1661 return Err(err);
1662 }
1663 }
1664 Message::Response(response) => return Err(unexpected_response(operation, &response)),
1665 other @ (Message::Handshake { .. } | Message::Request(_) | Message::FatalExit(_)) => {
1666 return Err(LeanWorkerError::Protocol {
1667 message: format!("worker sent unexpected {operation} message: {other:?}"),
1668 });
1669 }
1670 }
1671 }
1672 }
1673
1674 fn ensure_running(&mut self) -> Result<(), LeanWorkerError> {
1675 match self.status()? {
1676 LeanWorkerStatus::Running => Ok(()),
1677 LeanWorkerStatus::Exited(exit) if exit.success => Err(LeanWorkerError::ChildExited { exit }),
1678 LeanWorkerStatus::Exited(exit) => Err(LeanWorkerError::ChildPanicOrAbort { exit }),
1679 }
1680 }
1681
1682 fn record_stream_success(&mut self, started: Instant) {
1683 self.stats.stream_successes = self.stats.stream_successes.saturating_add(1);
1684 self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1685 }
1686
1687 fn record_stream_failure(&mut self, started: Instant, backpressure_waits: u64) {
1688 self.stats.stream_failures = self.stats.stream_failures.saturating_add(1);
1689 self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1690 if backpressure_waits > 0 {
1691 self.stats.backpressure_failures = self.stats.backpressure_failures.saturating_add(1);
1692 }
1693 }
1694
1695 fn wait_for_exit(&mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
1696 let Some(child) = self.child.as_mut() else {
1697 return Err(self.dead_error());
1698 };
1699 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1700 let diagnostics = self.read_stderr();
1701 let exit = LeanWorkerExit::from_status(status, diagnostics);
1702 self.last_exit = Some(exit.clone());
1703 self.child = None;
1704 self.stdin = None;
1705 self.stdout = None;
1706 self.stats.exits = self.stats.exits.saturating_add(1);
1707 Ok(exit)
1708 }
1709
1710 fn try_record_exit(&mut self) -> Option<LeanWorkerExit> {
1711 let child = self.child.as_mut()?;
1712 let status = child.try_wait().ok().flatten()?;
1713 let diagnostics = self.read_stderr();
1714 let exit = LeanWorkerExit::from_status(status, diagnostics);
1715 self.last_exit = Some(exit.clone());
1716 self.child = None;
1717 self.stdin = None;
1718 self.stdout = None;
1719 self.stats.exits = self.stats.exits.saturating_add(1);
1720 Some(exit)
1721 }
1722
1723 fn record_exit_error(&mut self) -> LeanWorkerError {
1724 match self.wait_for_exit() {
1725 Ok(exit) if exit.success => LeanWorkerError::ChildExited { exit },
1726 Ok(exit) => LeanWorkerError::ChildPanicOrAbort { exit },
1727 Err(err) => err,
1728 }
1729 }
1730
1731 fn stop_existing_child(&mut self) -> Result<(), LeanWorkerError> {
1732 if let Some(child) = self.child.as_mut() {
1733 drop(child.kill());
1734 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1735 let diagnostics = self.read_stderr();
1736 self.last_exit = Some(LeanWorkerExit::from_status(status, diagnostics));
1737 self.stats.exits = self.stats.exits.saturating_add(1);
1738 }
1739 self.child = None;
1740 self.stdin = None;
1741 self.stdout = None;
1742 Ok(())
1743 }
1744
1745 fn dead_error(&self) -> LeanWorkerError {
1746 let exit = self.last_exit.clone().unwrap_or_else(|| LeanWorkerExit {
1747 success: false,
1748 code: None,
1749 status: "worker is not running".to_owned(),
1750 diagnostics: String::new(),
1751 });
1752 if exit.success {
1753 LeanWorkerError::ChildExited { exit }
1754 } else {
1755 LeanWorkerError::ChildPanicOrAbort { exit }
1756 }
1757 }
1758
1759 fn read_stderr(&mut self) -> String {
1760 let mut diagnostics = String::new();
1761 if let Some(mut pipe) = self.stderr.take() {
1762 drop(pipe.read_to_string(&mut diagnostics));
1763 }
1764 diagnostics
1765 }
1766
1767 fn child_rss_kib(&mut self) -> Option<u64> {
1768 let child = self.child.as_mut()?;
1769 child_rss_kib(child.id())
1770 }
1771}
1772
1773enum RequestReaderEvent {
1774 Message {
1775 message: Message,
1776 backpressure_waits: u64,
1777 },
1778 Terminal {
1779 message: Message,
1780 stdout: BufReader<ChildStdout>,
1781 backpressure_waits: u64,
1782 },
1783 ReadError {
1784 message: String,
1785 eof: bool,
1786 backpressure_waits: u64,
1787 },
1788}
1789
1790impl RequestReaderEvent {
1791 fn backpressure_waits(&self) -> u64 {
1792 match self {
1793 Self::Message { backpressure_waits, .. }
1794 | Self::Terminal { backpressure_waits, .. }
1795 | Self::ReadError { backpressure_waits, .. } => *backpressure_waits,
1796 }
1797 }
1798
1799 fn add_backpressure_wait(&mut self) {
1800 match self {
1801 Self::Message { backpressure_waits, .. }
1802 | Self::Terminal { backpressure_waits, .. }
1803 | Self::ReadError { backpressure_waits, .. } => {
1804 *backpressure_waits = backpressure_waits.saturating_add(1);
1805 }
1806 }
1807 }
1808}
1809
1810#[allow(
1811 clippy::needless_pass_by_value,
1812 reason = "the request reader thread must own the sender"
1813)]
1814fn read_request_messages(mut stdout: BufReader<ChildStdout>, sender: mpsc::SyncSender<RequestReaderEvent>) {
1815 loop {
1816 match read_frame(&mut stdout) {
1817 Ok(frame) if matches!(frame.message, Message::Response(_)) => {
1818 let _ = send_reader_event(
1819 &sender,
1820 RequestReaderEvent::Terminal {
1821 message: frame.message,
1822 stdout,
1823 backpressure_waits: 0,
1824 },
1825 );
1826 return;
1827 }
1828 Ok(frame) => {
1829 if send_reader_event(
1830 &sender,
1831 RequestReaderEvent::Message {
1832 message: frame.message,
1833 backpressure_waits: 0,
1834 },
1835 )
1836 .is_err()
1837 {
1838 return;
1839 }
1840 }
1841 Err(err) => {
1842 let _ = send_reader_event(
1843 &sender,
1844 RequestReaderEvent::ReadError {
1845 message: err.to_string(),
1846 eof: err.is_eof(),
1847 backpressure_waits: 0,
1848 },
1849 );
1850 return;
1851 }
1852 }
1853 }
1854}
1855
1856fn send_reader_event(sender: &mpsc::SyncSender<RequestReaderEvent>, event: RequestReaderEvent) -> Result<(), ()> {
1857 match sender.try_send(event) {
1858 Ok(()) => Ok(()),
1859 Err(mpsc::TrySendError::Full(mut event)) => {
1860 event.add_backpressure_wait();
1861 sender.send(event).map_err(|_| ())
1862 }
1863 Err(mpsc::TrySendError::Disconnected(_event)) => Err(()),
1864 }
1865}
1866
1867impl Drop for LeanWorker {
1868 fn drop(&mut self) {
1869 if let Some(child) = self.child.as_mut() {
1870 drop(child.kill());
1871 drop(child.wait());
1872 }
1873 }
1874}
1875
1876fn expect_handshake(stdout: &mut BufReader<ChildStdout>) -> Result<LeanWorkerRuntimeMetadata, LeanWorkerError> {
1877 let frame = read_frame(stdout).map_err(|err| {
1878 if err.is_eof() {
1879 LeanWorkerError::Handshake {
1880 message: "child closed stdout before handshake".to_owned(),
1881 }
1882 } else {
1883 LeanWorkerError::Handshake {
1884 message: err.to_string(),
1885 }
1886 }
1887 })?;
1888 match frame.message {
1889 Message::Handshake {
1890 worker_version,
1891 protocol_version,
1892 } if protocol_version == crate::protocol::PROTOCOL_VERSION => Ok(LeanWorkerRuntimeMetadata {
1893 worker_version,
1894 protocol_version,
1895 lean_version: None,
1896 }),
1897 other @ (Message::Handshake { .. }
1898 | Message::Request(_)
1899 | Message::Response(_)
1900 | Message::Diagnostic(_)
1901 | Message::ProgressTick(_)
1902 | Message::DataRow(_)
1903 | Message::FatalExit(_)) => Err(LeanWorkerError::Handshake {
1904 message: format!("unexpected handshake frame: {other:?}"),
1905 }),
1906 }
1907}
1908
1909fn wait_with_stderr(child: &mut Child, stderr: Option<ChildStderr>) -> Result<LeanWorkerExit, LeanWorkerError> {
1910 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1911 let mut diagnostics = String::new();
1912 if let Some(mut pipe) = stderr {
1913 drop(pipe.read_to_string(&mut diagnostics));
1914 }
1915 Ok(LeanWorkerExit::from_status(status, diagnostics))
1916}
1917
1918fn unexpected_response(operation: &'static str, response: &Response) -> LeanWorkerError {
1919 LeanWorkerError::Protocol {
1920 message: format!("worker sent unexpected {operation} response: {response:?}"),
1921 }
1922}
1923
1924fn path_string(path: &Path) -> String {
1925 path.to_string_lossy().into_owned()
1926}
1927
1928#[cfg(target_os = "linux")]
1929fn child_rss_kib(pid: u32) -> Option<u64> {
1930 let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
1931 status.lines().find_map(|line| {
1932 let rest = line.strip_prefix("VmRSS:")?;
1933 rest.split_whitespace().next()?.parse::<u64>().ok()
1934 })
1935}
1936
1937#[cfg(not(target_os = "linux"))]
1938fn child_rss_kib(pid: u32) -> Option<u64> {
1939 let output = Command::new("ps")
1940 .args(["-o", "rss=", "-p", &pid.to_string()])
1941 .output()
1942 .ok()?;
1943 if !output.status.success() {
1944 return None;
1945 }
1946 let text = String::from_utf8_lossy(&output.stdout);
1947 text.trim().parse::<u64>().ok().filter(|value| *value > 0)
1948}