1use std::ffi::OsString;
2use std::fmt;
3use std::io::{BufReader, BufWriter, Read as _};
4use std::path::{Path, PathBuf};
5use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
6use std::sync::mpsc;
7use std::thread;
8use std::time::{Duration, Instant};
9
10use crate::capability::LeanWorkerBootstrapDiagnosticCode;
11use crate::protocol::{Message, Request, Response, read_frame, write_frame};
12use crate::session::LeanWorkerDataSinkTarget;
13use crate::session::{
14 LeanWorkerCancellationToken, LeanWorkerCapabilityMetadata, LeanWorkerDataSink, LeanWorkerDiagnosticSink,
15 LeanWorkerDoctorReport, LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerKernelResult,
16 LeanWorkerProgressSink, LeanWorkerRawDataSink, LeanWorkerRuntimeMetadata, LeanWorkerSessionConfig,
17 LeanWorkerStreamSummary, check_cancelled, elapsed_event, report_parent_data_row, report_parent_diagnostic,
18 report_parent_progress,
19};
20
21const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
22const WORKER_EVENT_BUFFER_CAPACITY: usize = 64;
23
24pub const LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(30);
26
27pub const LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING: Duration = Duration::from_mins(10);
29
30#[derive(Clone, Debug)]
37pub struct LeanWorkerConfig {
38 executable: PathBuf,
39 current_dir: Option<PathBuf>,
40 env: Vec<(OsString, OsString)>,
41 startup_timeout: Duration,
42 request_timeout: Duration,
43 restart_policy: LeanWorkerRestartPolicy,
44}
45
46impl LeanWorkerConfig {
47 pub fn new(executable: impl Into<PathBuf>) -> Self {
49 Self {
50 executable: executable.into(),
51 current_dir: None,
52 env: Vec::new(),
53 startup_timeout: DEFAULT_STARTUP_TIMEOUT,
54 request_timeout: LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT,
55 restart_policy: LeanWorkerRestartPolicy::default(),
56 }
57 }
58
59 pub fn executable(&self) -> &Path {
61 &self.executable
62 }
63
64 #[must_use]
66 pub fn current_dir(mut self, path: impl Into<PathBuf>) -> Self {
67 self.current_dir = Some(path.into());
68 self
69 }
70
71 #[must_use]
73 pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
74 self.env.push((key.into(), value.into()));
75 self
76 }
77
78 #[must_use]
80 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
81 self.startup_timeout = timeout;
82 self
83 }
84
85 #[must_use]
91 pub fn request_timeout(mut self, timeout: Duration) -> Self {
92 self.request_timeout = timeout;
93 self
94 }
95
96 #[must_use]
98 pub fn long_running_requests(mut self) -> Self {
99 self.request_timeout = LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING;
100 self
101 }
102
103 #[must_use]
109 pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
110 self.restart_policy = policy;
111 self
112 }
113}
114
115#[derive(Clone, Debug, Default, Eq, PartialEq)]
122pub struct LeanWorkerRestartPolicy {
123 max_requests: Option<u64>,
124 max_imports: Option<u64>,
125 max_rss_kib: Option<u64>,
126 idle_restart_after: Option<Duration>,
127}
128
129impl LeanWorkerRestartPolicy {
130 #[must_use]
132 pub fn disabled() -> Self {
133 Self::default()
134 }
135
136 #[must_use]
138 pub fn max_requests(mut self, limit: u64) -> Self {
139 self.max_requests = Some(limit.max(1));
140 self
141 }
142
143 #[must_use]
145 pub fn max_imports(mut self, limit: u64) -> Self {
146 self.max_imports = Some(limit.max(1));
147 self
148 }
149
150 #[must_use]
156 pub fn max_rss_kib(mut self, limit: u64) -> Self {
157 self.max_rss_kib = Some(limit.max(1));
158 self
159 }
160
161 #[must_use]
163 pub fn idle_restart_after(mut self, duration: Duration) -> Self {
164 self.idle_restart_after = Some(duration);
165 self
166 }
167}
168
169#[derive(Clone, Debug, Eq, PartialEq)]
171pub enum LeanWorkerRestartReason {
172 Explicit,
174 MaxRequests { limit: u64 },
176 MaxImports { limit: u64 },
178 RssCeiling { current_kib: u64, limit_kib: u64 },
180 Idle { idle_for: Duration, limit: Duration },
182 Cancelled { operation: &'static str },
184 RequestTimeout {
186 operation: &'static str,
187 duration: Duration,
188 },
189}
190
191#[derive(Clone, Debug, Default, Eq, PartialEq)]
193pub struct LeanWorkerStats {
194 pub requests: u64,
196 pub imports: u64,
198 pub exits: u64,
200 pub restarts: u64,
202 pub explicit_cycles: u64,
204 pub max_request_restarts: u64,
206 pub max_import_restarts: u64,
208 pub rss_restarts: u64,
210 pub idle_restarts: u64,
212 pub cancelled_restarts: u64,
214 pub timeout_restarts: u64,
216 pub rss_samples_unavailable: u64,
218 pub last_rss_kib: Option<u64>,
220 pub last_restart_reason: Option<LeanWorkerRestartReason>,
222 pub stream_requests: u64,
224 pub stream_successes: u64,
226 pub stream_failures: u64,
228 pub data_rows_delivered: u64,
230 pub data_row_payload_bytes: u64,
232 pub stream_elapsed: Duration,
234 pub backpressure_waits: u64,
236 pub backpressure_failures: u64,
238}
239
240impl LeanWorkerStats {
241 fn record_restart(&mut self, reason: LeanWorkerRestartReason) {
242 self.restarts = self.restarts.saturating_add(1);
243 match &reason {
244 LeanWorkerRestartReason::Explicit => {
245 self.explicit_cycles = self.explicit_cycles.saturating_add(1);
246 }
247 LeanWorkerRestartReason::MaxRequests { .. } => {
248 self.max_request_restarts = self.max_request_restarts.saturating_add(1);
249 }
250 LeanWorkerRestartReason::MaxImports { .. } => {
251 self.max_import_restarts = self.max_import_restarts.saturating_add(1);
252 }
253 LeanWorkerRestartReason::RssCeiling { .. } => {
254 self.rss_restarts = self.rss_restarts.saturating_add(1);
255 }
256 LeanWorkerRestartReason::Idle { .. } => {
257 self.idle_restarts = self.idle_restarts.saturating_add(1);
258 }
259 LeanWorkerRestartReason::Cancelled { .. } => {
260 self.cancelled_restarts = self.cancelled_restarts.saturating_add(1);
261 }
262 LeanWorkerRestartReason::RequestTimeout { .. } => {
263 self.timeout_restarts = self.timeout_restarts.saturating_add(1);
264 }
265 }
266 self.last_restart_reason = Some(reason);
267 }
268}
269
270#[derive(Clone, Debug, Eq, PartialEq)]
272pub enum LeanWorkerStatus {
273 Running,
275 Exited(LeanWorkerExit),
277}
278
279#[derive(Clone, Debug, Eq, PartialEq)]
281pub struct LeanWorkerExit {
282 pub success: bool,
284 pub code: Option<i32>,
286 pub status: String,
288 pub diagnostics: String,
290}
291
292impl LeanWorkerExit {
293 fn from_status(status: ExitStatus, diagnostics: String) -> Self {
294 Self {
295 success: status.success(),
296 code: status.code(),
297 status: status.to_string(),
298 diagnostics,
299 }
300 }
301}
302
303#[derive(Debug)]
305pub enum LeanWorkerError {
306 Spawn {
308 executable: PathBuf,
309 source: std::io::Error,
310 },
311 WorkerChildUnresolved {
313 tried: Vec<PathBuf>,
315 },
316 WorkerChildNotExecutable { path: PathBuf, reason: String },
318 Bootstrap {
320 code: LeanWorkerBootstrapDiagnosticCode,
321 message: String,
322 },
323 CapabilityBuild {
325 diagnostic: lean_toolchain::LinkDiagnostics,
327 },
328 Setup { message: String },
330 Handshake { message: String },
332 Protocol { message: String },
334 Worker { code: String, message: String },
336 ChildExited { exit: LeanWorkerExit },
338 ChildPanicOrAbort { exit: LeanWorkerExit },
340 Timeout {
342 operation: &'static str,
343 duration: Duration,
344 },
345 Cancelled { operation: &'static str },
347 ProgressPanic { message: String },
349 DataSinkPanic { message: String },
351 DiagnosticSinkPanic { message: String },
353 StreamExportFailed { status: u8 },
355 StreamCallbackFailed { status: u8, description: String },
357 StreamRowMalformed { message: String },
359 CapabilityMetadataMalformed { message: String },
361 CapabilityMetadataMismatch {
363 export: String,
364 expected: Box<LeanWorkerCapabilityMetadata>,
365 actual: Box<LeanWorkerCapabilityMetadata>,
366 },
367 CapabilityDoctorMalformed { message: String },
369 TypedCommandRequestEncode { export: String, message: String },
371 TypedCommandResponseDecode { export: String, message: String },
373 TypedCommandRowDecode {
375 export: String,
376 stream: String,
377 sequence: u64,
378 message: String,
379 },
380 TypedCommandSummaryDecode { export: String, message: String },
382 LeaseInvalidated { reason: String },
384 WorkerPoolExhausted { max_workers: usize },
386 WorkerPoolMemoryBudgetExceeded { current_kib: u64, limit_kib: u64 },
388 WorkerPoolQueueTimeout { waited: Duration },
390 UnsupportedRequest { operation: &'static str },
392 Wait { source: std::io::Error },
394}
395
396impl fmt::Display for LeanWorkerError {
397 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398 match self {
399 Self::Spawn { executable, source } => {
400 write!(f, "failed to spawn worker {}: {source}", executable.display())
401 }
402 Self::WorkerChildUnresolved { tried } => {
403 let tried = tried
404 .iter()
405 .map(|path| path.display().to_string())
406 .collect::<Vec<_>>()
407 .join(", ");
408 write!(
409 f,
410 "could not resolve lean-rs-worker-child; set LEAN_RS_WORKER_CHILD or place it beside the current executable (tried: {tried})"
411 )
412 }
413 Self::WorkerChildNotExecutable { path, reason } => {
414 write!(f, "worker child '{}' is not executable: {reason}", path.display())
415 }
416 Self::Bootstrap { code, message } => {
417 write!(f, "worker bootstrap check {code} failed: {message}")
418 }
419 Self::CapabilityBuild { diagnostic } => {
420 write!(f, "worker capability Lake target build failed: {diagnostic}")
421 }
422 Self::Setup { message } => write!(f, "worker child setup failed: {message}"),
423 Self::Handshake { message } => write!(f, "worker handshake failed: {message}"),
424 Self::Protocol { message } => write!(f, "worker protocol failed: {message}"),
425 Self::Worker { code, message } => write!(f, "worker returned {code}: {message}"),
426 Self::ChildExited { exit } => write!(f, "worker exited with {}", exit.status),
427 Self::ChildPanicOrAbort { exit } => {
428 write!(f, "worker exited fatally with {}", exit.status)
429 }
430 Self::Timeout { operation, duration } => {
431 write!(f, "worker operation {operation} timed out after {duration:?}")
432 }
433 Self::Cancelled { operation } => write!(f, "worker operation {operation} was cancelled"),
434 Self::ProgressPanic { message } => write!(f, "worker progress sink panicked: {message}"),
435 Self::DataSinkPanic { message } => write!(f, "worker data sink panicked: {message}"),
436 Self::DiagnosticSinkPanic { message } => {
437 write!(f, "worker diagnostic sink panicked: {message}")
438 }
439 Self::StreamExportFailed { status } => write!(f, "streaming export returned status {status}"),
440 Self::StreamCallbackFailed { status, description } => {
441 write!(f, "streaming callback failed with status {status}: {description}")
442 }
443 Self::StreamRowMalformed { message } => write!(f, "streaming export emitted malformed row: {message}"),
444 Self::CapabilityMetadataMalformed { message } => {
445 write!(f, "capability metadata export returned malformed JSON: {message}")
446 }
447 Self::CapabilityMetadataMismatch { export, .. } => {
448 write!(f, "capability metadata from {export} did not match expectation")
449 }
450 Self::CapabilityDoctorMalformed { message } => {
451 write!(f, "capability doctor export returned malformed JSON: {message}")
452 }
453 Self::TypedCommandRequestEncode { export, message } => {
454 write!(f, "typed worker command {export} request JSON encode failed: {message}")
455 }
456 Self::TypedCommandResponseDecode { export, message } => {
457 write!(
458 f,
459 "typed worker command {export} response JSON decode failed: {message}"
460 )
461 }
462 Self::TypedCommandRowDecode {
463 export,
464 stream,
465 sequence,
466 message,
467 } => {
468 write!(
469 f,
470 "typed worker command {export} row decode failed at stream {stream} sequence {sequence}: {message}"
471 )
472 }
473 Self::TypedCommandSummaryDecode { export, message } => {
474 write!(
475 f,
476 "typed worker command {export} terminal summary decode failed: {message}"
477 )
478 }
479 Self::LeaseInvalidated { reason } => write!(f, "worker pool lease was invalidated: {reason}"),
480 Self::WorkerPoolExhausted { max_workers } => {
481 write!(
482 f,
483 "worker pool cannot admit another session key; max_workers={max_workers}"
484 )
485 }
486 Self::WorkerPoolMemoryBudgetExceeded { current_kib, limit_kib } => {
487 write!(
488 f,
489 "worker pool cannot admit work within RSS budget; current_kib={current_kib} limit_kib={limit_kib}"
490 )
491 }
492 Self::WorkerPoolQueueTimeout { waited } => {
493 write!(f, "worker pool admission timed out after {waited:?}")
494 }
495 Self::UnsupportedRequest { operation } => {
496 write!(f, "worker operation {operation} is not supported")
497 }
498 Self::Wait { source } => write!(f, "failed to wait for worker child: {source}"),
499 }
500 }
501}
502
503impl std::error::Error for LeanWorkerError {
504 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
505 match self {
506 Self::Spawn { source, .. } | Self::Wait { source } => Some(source),
507 Self::CapabilityBuild { diagnostic } => Some(diagnostic),
508 Self::WorkerChildUnresolved { .. } | Self::WorkerChildNotExecutable { .. } | Self::Bootstrap { .. } => None,
509 Self::Setup { .. }
510 | Self::Handshake { .. }
511 | Self::Protocol { .. }
512 | Self::Worker { .. }
513 | Self::ChildExited { .. }
514 | Self::ChildPanicOrAbort { .. }
515 | Self::Timeout { .. }
516 | Self::Cancelled { .. }
517 | Self::ProgressPanic { .. }
518 | Self::DataSinkPanic { .. }
519 | Self::DiagnosticSinkPanic { .. }
520 | Self::StreamExportFailed { .. }
521 | Self::StreamCallbackFailed { .. }
522 | Self::StreamRowMalformed { .. }
523 | Self::CapabilityMetadataMalformed { .. }
524 | Self::CapabilityMetadataMismatch { .. }
525 | Self::CapabilityDoctorMalformed { .. }
526 | Self::TypedCommandRequestEncode { .. }
527 | Self::TypedCommandResponseDecode { .. }
528 | Self::TypedCommandRowDecode { .. }
529 | Self::TypedCommandSummaryDecode { .. }
530 | Self::LeaseInvalidated { .. }
531 | Self::WorkerPoolExhausted { .. }
532 | Self::WorkerPoolMemoryBudgetExceeded { .. }
533 | Self::WorkerPoolQueueTimeout { .. }
534 | Self::UnsupportedRequest { .. } => None,
535 }
536 }
537}
538
539#[derive(Debug)]
545pub struct LeanWorker {
546 config: LeanWorkerConfig,
547 child: Option<Child>,
548 stdin: Option<BufWriter<ChildStdin>>,
549 stdout: Option<BufReader<ChildStdout>>,
550 stderr: Option<ChildStderr>,
551 last_exit: Option<LeanWorkerExit>,
552 runtime_metadata: LeanWorkerRuntimeMetadata,
553 stats: LeanWorkerStats,
554 requests_since_restart: u64,
555 imports_since_restart: u64,
556 last_activity: Instant,
557}
558
559impl LeanWorker {
560 pub fn spawn(config: &LeanWorkerConfig) -> Result<Self, LeanWorkerError> {
568 let mut command = Command::new(&config.executable);
569 command
570 .stdin(Stdio::piped())
571 .stdout(Stdio::piped())
572 .stderr(Stdio::piped())
573 .env("LEAN_ABORT_ON_PANIC", "1")
574 .env("RUST_BACKTRACE", "0");
575
576 if let Some(current_dir) = &config.current_dir {
577 command.current_dir(current_dir);
578 }
579 for (key, value) in &config.env {
580 command.env(key, value);
581 }
582
583 let mut child = command.spawn().map_err(|source| LeanWorkerError::Spawn {
584 executable: config.executable.clone(),
585 source,
586 })?;
587
588 let stdin = child
589 .stdin
590 .take()
591 .map(BufWriter::new)
592 .ok_or_else(|| LeanWorkerError::Setup {
593 message: "child stdin unavailable".to_owned(),
594 })?;
595 let stdout = child.stdout.take().ok_or_else(|| LeanWorkerError::Setup {
596 message: "child stdout unavailable".to_owned(),
597 })?;
598 let stderr = child.stderr.take();
599
600 let (sender, receiver) = mpsc::channel();
601 let _handshake_reader = thread::spawn(move || {
602 let mut stdout = BufReader::new(stdout);
603 let result = expect_handshake(&mut stdout);
604 drop(sender.send((stdout, result)));
605 });
606
607 let (stdout, runtime_metadata) = match receiver.recv_timeout(config.startup_timeout) {
608 Ok((stdout, Ok(metadata))) => (stdout, metadata),
609 Ok((_stdout, Err(err))) => {
610 let mut worker = Self {
611 config: config.clone(),
612 child: Some(child),
613 stdin: Some(stdin),
614 stdout: None,
615 stderr,
616 last_exit: None,
617 runtime_metadata: LeanWorkerRuntimeMetadata {
618 worker_version: String::new(),
619 protocol_version: crate::protocol::PROTOCOL_VERSION,
620 lean_version: None,
621 },
622 stats: LeanWorkerStats::default(),
623 requests_since_restart: 0,
624 imports_since_restart: 0,
625 last_activity: Instant::now(),
626 };
627 let exit = worker.try_record_exit();
628 return Err(match exit {
629 Some(exit) if !exit.success => LeanWorkerError::ChildPanicOrAbort { exit },
630 Some(exit) => LeanWorkerError::ChildExited { exit },
631 None => err,
632 });
633 }
634 Err(mpsc::RecvTimeoutError::Timeout) => {
635 drop(child.kill());
636 let _exit = wait_with_stderr(&mut child, stderr)?;
637 return Err(LeanWorkerError::Timeout {
638 operation: "startup",
639 duration: config.startup_timeout,
640 });
641 }
642 Err(mpsc::RecvTimeoutError::Disconnected) => {
643 return Err(LeanWorkerError::Handshake {
644 message: "handshake reader exited without a result".to_owned(),
645 });
646 }
647 };
648
649 Ok(Self {
650 config: config.clone(),
651 child: Some(child),
652 stdin: Some(stdin),
653 stdout: Some(stdout),
654 stderr,
655 last_exit: None,
656 runtime_metadata,
657 stats: LeanWorkerStats::default(),
658 requests_since_restart: 0,
659 imports_since_restart: 0,
660 last_activity: Instant::now(),
661 })
662 }
663
664 pub fn health(&mut self) -> Result<(), LeanWorkerError> {
671 self.prepare_request(false)?;
672 self.send_request(Request::Health)?;
673 self.record_request(false);
674 match self.read_response("health")? {
675 Response::HealthOk => Ok(()),
676 other @ (Response::CapabilityLoaded
677 | Response::U64 { .. }
678 | Response::HostSessionOpened
679 | Response::Elaboration { .. }
680 | Response::KernelCheck { .. }
681 | Response::Strings { .. }
682 | Response::StreamComplete { .. }
683 | Response::StreamExportFailed { .. }
684 | Response::StreamCallbackFailed { .. }
685 | Response::StreamRowMalformed { .. }
686 | Response::CapabilityMetadata { .. }
687 | Response::CapabilityDoctor { .. }
688 | Response::CapabilityMetadataMalformed { .. }
689 | Response::CapabilityDoctorMalformed { .. }
690 | Response::JsonCommand { .. }
691 | Response::RowsComplete { .. }
692 | Response::Terminating
693 | Response::Error { .. }) => Err(unexpected_response("health", &other)),
694 }
695 }
696
697 pub fn load_fixture_capability(&mut self, fixture_root: impl AsRef<Path>) -> Result<(), LeanWorkerError> {
708 self.prepare_request(true)?;
709 self.send_request(Request::LoadFixtureCapability {
710 fixture_root: path_string(fixture_root.as_ref()),
711 })?;
712 self.record_request(true);
713 match self.read_response("load_fixture_capability")? {
714 Response::CapabilityLoaded => Ok(()),
715 other @ (Response::HealthOk
716 | Response::U64 { .. }
717 | Response::HostSessionOpened
718 | Response::Elaboration { .. }
719 | Response::KernelCheck { .. }
720 | Response::Strings { .. }
721 | Response::StreamComplete { .. }
722 | Response::StreamExportFailed { .. }
723 | Response::StreamCallbackFailed { .. }
724 | Response::StreamRowMalformed { .. }
725 | Response::CapabilityMetadata { .. }
726 | Response::CapabilityDoctor { .. }
727 | Response::CapabilityMetadataMalformed { .. }
728 | Response::CapabilityDoctorMalformed { .. }
729 | Response::JsonCommand { .. }
730 | Response::RowsComplete { .. }
731 | Response::Terminating
732 | Response::Error { .. }) => Err(unexpected_response("load_fixture_capability", &other)),
733 }
734 }
735
736 pub fn call_fixture_mul(
743 &mut self,
744 fixture_root: impl AsRef<Path>,
745 lhs: u64,
746 rhs: u64,
747 ) -> Result<u64, LeanWorkerError> {
748 self.prepare_request(true)?;
749 self.send_request(Request::CallFixtureMul {
750 fixture_root: path_string(fixture_root.as_ref()),
751 lhs,
752 rhs,
753 })?;
754 self.record_request(true);
755 match self.read_response("call_fixture_mul")? {
756 Response::U64 { value } => Ok(value),
757 other @ (Response::HealthOk
758 | Response::CapabilityLoaded
759 | Response::HostSessionOpened
760 | Response::Elaboration { .. }
761 | Response::KernelCheck { .. }
762 | Response::Strings { .. }
763 | Response::StreamComplete { .. }
764 | Response::StreamExportFailed { .. }
765 | Response::StreamCallbackFailed { .. }
766 | Response::StreamRowMalformed { .. }
767 | Response::CapabilityMetadata { .. }
768 | Response::CapabilityDoctor { .. }
769 | Response::CapabilityMetadataMalformed { .. }
770 | Response::CapabilityDoctorMalformed { .. }
771 | Response::JsonCommand { .. }
772 | Response::RowsComplete { .. }
773 | Response::Terminating
774 | Response::Error { .. }) => Err(unexpected_response("call_fixture_mul", &other)),
775 }
776 }
777
778 pub fn status(&mut self) -> Result<LeanWorkerStatus, LeanWorkerError> {
784 if let Some(exit) = &self.last_exit {
785 return Ok(LeanWorkerStatus::Exited(exit.clone()));
786 }
787 let Some(child) = self.child.as_mut() else {
788 return Ok(LeanWorkerStatus::Exited(LeanWorkerExit {
789 success: false,
790 code: None,
791 status: "worker is not running".to_owned(),
792 diagnostics: String::new(),
793 }));
794 };
795 match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
796 Some(status) => {
797 let diagnostics = self.read_stderr();
798 let exit = LeanWorkerExit::from_status(status, diagnostics);
799 self.last_exit = Some(exit.clone());
800 self.child = None;
801 self.stdin = None;
802 self.stdout = None;
803 self.stats.exits = self.stats.exits.saturating_add(1);
804 Ok(LeanWorkerStatus::Exited(exit))
805 }
806 None => Ok(LeanWorkerStatus::Running),
807 }
808 }
809
810 #[must_use]
812 pub fn stats(&self) -> LeanWorkerStats {
813 self.stats.clone()
814 }
815
816 #[must_use]
818 pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
819 self.runtime_metadata.clone()
820 }
821
822 pub fn rss_kib(&mut self) -> Option<u64> {
828 match self.child_rss_kib() {
829 Some(value) => {
830 self.stats.last_rss_kib = Some(value);
831 Some(value)
832 }
833 None => {
834 self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
835 None
836 }
837 }
838 }
839
840 #[must_use]
842 pub fn request_timeout(&self) -> Duration {
843 self.config.request_timeout
844 }
845
846 pub fn set_request_timeout(&mut self, timeout: Duration) {
851 self.config.request_timeout = timeout;
852 }
853
854 pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
865 self.restart_with_reason(LeanWorkerRestartReason::Explicit)
866 }
867
868 pub(crate) fn cycle_with_restart_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
869 self.restart_with_reason(reason)
870 }
871
872 pub fn restart(&mut self) -> Result<(), LeanWorkerError> {
883 self.cycle()
884 }
885
886 #[doc(hidden)]
887 pub fn __kill_for_test(&mut self) -> Result<(), LeanWorkerError> {
894 let Some(child) = self.child.as_mut() else {
895 return Err(self.dead_error());
896 };
897 child.kill().map_err(|source| LeanWorkerError::Wait { source })?;
898 Ok(())
899 }
900
901 pub fn terminate(mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
908 self.send_request(Request::Terminate)?;
909 match self.read_response("terminate")? {
910 Response::Terminating => self.wait_for_exit(),
911 other @ (Response::HealthOk
912 | Response::CapabilityLoaded
913 | Response::U64 { .. }
914 | Response::HostSessionOpened
915 | Response::Elaboration { .. }
916 | Response::KernelCheck { .. }
917 | Response::Strings { .. }
918 | Response::StreamComplete { .. }
919 | Response::StreamExportFailed { .. }
920 | Response::StreamCallbackFailed { .. }
921 | Response::StreamRowMalformed { .. }
922 | Response::CapabilityMetadata { .. }
923 | Response::CapabilityDoctor { .. }
924 | Response::CapabilityMetadataMalformed { .. }
925 | Response::CapabilityDoctorMalformed { .. }
926 | Response::JsonCommand { .. }
927 | Response::RowsComplete { .. }
928 | Response::Error { .. }) => Err(unexpected_response("terminate", &other)),
929 }
930 }
931
932 #[doc(hidden)]
933 pub fn __trigger_lean_panic_fixture(
940 mut self,
941 fixture_root: impl AsRef<Path>,
942 ) -> Result<LeanWorkerExit, LeanWorkerError> {
943 self.prepare_request(true)?;
944 self.send_request(Request::TriggerLeanPanic {
945 fixture_root: path_string(fixture_root.as_ref()),
946 })?;
947 self.record_request(true);
948 match self.read_response("trigger_lean_panic") {
949 Ok(response) => Err(unexpected_response("trigger_lean_panic", &response)),
950 Err(LeanWorkerError::ChildPanicOrAbort { exit }) => Ok(exit),
951 Err(err) => Err(err),
952 }
953 }
954
955 #[doc(hidden)]
956 pub fn __emit_test_rows(
963 &mut self,
964 streams: Vec<String>,
965 cancellation: Option<&LeanWorkerCancellationToken>,
966 data: Option<&dyn LeanWorkerDataSink>,
967 ) -> Result<u64, LeanWorkerError> {
968 const OPERATION: &str = "emit_test_rows";
969 check_cancelled(OPERATION, cancellation)?;
970 self.prepare_request(false)?;
971 self.send_request(Request::EmitTestRows { streams })?;
972 self.record_request(false);
973 match self.read_response_with_events(
974 OPERATION,
975 None,
976 cancellation,
977 data.map(LeanWorkerDataSinkTarget::Value),
978 None,
979 )? {
980 Response::RowsComplete { count } => Ok(count),
981 other @ (Response::HealthOk
982 | Response::CapabilityLoaded
983 | Response::U64 { .. }
984 | Response::HostSessionOpened
985 | Response::Elaboration { .. }
986 | Response::KernelCheck { .. }
987 | Response::Strings { .. }
988 | Response::StreamComplete { .. }
989 | Response::StreamExportFailed { .. }
990 | Response::StreamCallbackFailed { .. }
991 | Response::StreamRowMalformed { .. }
992 | Response::CapabilityMetadata { .. }
993 | Response::CapabilityDoctor { .. }
994 | Response::CapabilityMetadataMalformed { .. }
995 | Response::CapabilityDoctorMalformed { .. }
996 | Response::JsonCommand { .. }
997 | Response::Terminating
998 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
999 }
1000 }
1001
1002 pub(crate) fn open_worker_session(
1003 &mut self,
1004 config: &LeanWorkerSessionConfig,
1005 cancellation: Option<&LeanWorkerCancellationToken>,
1006 progress: Option<&dyn LeanWorkerProgressSink>,
1007 ) -> Result<(), LeanWorkerError> {
1008 const OPERATION: &str = "open_worker_session";
1009 check_cancelled(OPERATION, cancellation)?;
1010 self.prepare_request(true)?;
1011 self.send_request(Request::OpenHostSession {
1012 project_root: config.project_root_string(),
1013 package: config.package().to_owned(),
1014 lib_name: config.lib_name().to_owned(),
1015 imports: config.imports().to_vec(),
1016 })?;
1017 self.record_request(true);
1018 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1019 Response::HostSessionOpened => Ok(()),
1020 other @ (Response::HealthOk
1021 | Response::CapabilityLoaded
1022 | Response::U64 { .. }
1023 | Response::Elaboration { .. }
1024 | Response::KernelCheck { .. }
1025 | Response::Strings { .. }
1026 | Response::StreamComplete { .. }
1027 | Response::StreamExportFailed { .. }
1028 | Response::StreamCallbackFailed { .. }
1029 | Response::StreamRowMalformed { .. }
1030 | Response::CapabilityMetadata { .. }
1031 | Response::CapabilityDoctor { .. }
1032 | Response::CapabilityMetadataMalformed { .. }
1033 | Response::CapabilityDoctorMalformed { .. }
1034 | Response::JsonCommand { .. }
1035 | Response::RowsComplete { .. }
1036 | Response::Terminating
1037 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1038 }
1039 }
1040
1041 pub(crate) fn worker_elaborate(
1042 &mut self,
1043 source: &str,
1044 options: &LeanWorkerElabOptions,
1045 cancellation: Option<&LeanWorkerCancellationToken>,
1046 progress: Option<&dyn LeanWorkerProgressSink>,
1047 ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
1048 const OPERATION: &str = "worker_elaborate";
1049 check_cancelled(OPERATION, cancellation)?;
1050 self.prepare_request(false)?;
1051 self.send_request(Request::Elaborate {
1052 source: source.to_owned(),
1053 options: options.wire(),
1054 })?;
1055 self.record_request(false);
1056 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1057 Response::Elaboration { outcome } => Ok(outcome.into()),
1058 other @ (Response::HealthOk
1059 | Response::CapabilityLoaded
1060 | Response::U64 { .. }
1061 | Response::HostSessionOpened
1062 | Response::KernelCheck { .. }
1063 | Response::Strings { .. }
1064 | Response::StreamComplete { .. }
1065 | Response::StreamExportFailed { .. }
1066 | Response::StreamCallbackFailed { .. }
1067 | Response::StreamRowMalformed { .. }
1068 | Response::CapabilityMetadata { .. }
1069 | Response::CapabilityDoctor { .. }
1070 | Response::CapabilityMetadataMalformed { .. }
1071 | Response::CapabilityDoctorMalformed { .. }
1072 | Response::JsonCommand { .. }
1073 | Response::RowsComplete { .. }
1074 | Response::Terminating
1075 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1076 }
1077 }
1078
1079 pub(crate) fn worker_kernel_check(
1080 &mut self,
1081 source: &str,
1082 options: &LeanWorkerElabOptions,
1083 cancellation: Option<&LeanWorkerCancellationToken>,
1084 progress: Option<&dyn LeanWorkerProgressSink>,
1085 ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
1086 const OPERATION: &str = "worker_kernel_check";
1087 check_cancelled(OPERATION, cancellation)?;
1088 self.prepare_request(false)?;
1089 self.send_request(Request::KernelCheck {
1090 source: source.to_owned(),
1091 options: options.wire(),
1092 progress: progress.is_some(),
1093 })?;
1094 self.record_request(false);
1095 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1096 Response::KernelCheck { outcome } => Ok(outcome.into()),
1097 other @ (Response::HealthOk
1098 | Response::CapabilityLoaded
1099 | Response::U64 { .. }
1100 | Response::HostSessionOpened
1101 | Response::Elaboration { .. }
1102 | Response::Strings { .. }
1103 | Response::StreamComplete { .. }
1104 | Response::StreamExportFailed { .. }
1105 | Response::StreamCallbackFailed { .. }
1106 | Response::StreamRowMalformed { .. }
1107 | Response::CapabilityMetadata { .. }
1108 | Response::CapabilityDoctor { .. }
1109 | Response::CapabilityMetadataMalformed { .. }
1110 | Response::CapabilityDoctorMalformed { .. }
1111 | Response::JsonCommand { .. }
1112 | Response::RowsComplete { .. }
1113 | Response::Terminating
1114 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1115 }
1116 }
1117
1118 pub(crate) fn worker_declaration_kinds(
1119 &mut self,
1120 names: &[&str],
1121 cancellation: Option<&LeanWorkerCancellationToken>,
1122 progress: Option<&dyn LeanWorkerProgressSink>,
1123 ) -> Result<Vec<String>, LeanWorkerError> {
1124 const OPERATION: &str = "worker_declaration_kinds";
1125 check_cancelled(OPERATION, cancellation)?;
1126 self.prepare_request(false)?;
1127 self.send_request(Request::DeclarationKinds {
1128 names: names.iter().map(|name| (*name).to_owned()).collect(),
1129 progress: progress.is_some(),
1130 })?;
1131 self.record_request(false);
1132 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1133 Response::Strings { values } => Ok(values),
1134 other @ (Response::HealthOk
1135 | Response::CapabilityLoaded
1136 | Response::U64 { .. }
1137 | Response::HostSessionOpened
1138 | Response::Elaboration { .. }
1139 | Response::KernelCheck { .. }
1140 | Response::StreamComplete { .. }
1141 | Response::StreamExportFailed { .. }
1142 | Response::StreamCallbackFailed { .. }
1143 | Response::StreamRowMalformed { .. }
1144 | Response::CapabilityMetadata { .. }
1145 | Response::CapabilityDoctor { .. }
1146 | Response::CapabilityMetadataMalformed { .. }
1147 | Response::CapabilityDoctorMalformed { .. }
1148 | Response::JsonCommand { .. }
1149 | Response::RowsComplete { .. }
1150 | Response::Terminating
1151 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1152 }
1153 }
1154
1155 pub(crate) fn worker_declaration_names(
1156 &mut self,
1157 names: &[&str],
1158 cancellation: Option<&LeanWorkerCancellationToken>,
1159 progress: Option<&dyn LeanWorkerProgressSink>,
1160 ) -> Result<Vec<String>, LeanWorkerError> {
1161 const OPERATION: &str = "worker_declaration_names";
1162 check_cancelled(OPERATION, cancellation)?;
1163 self.prepare_request(false)?;
1164 self.send_request(Request::DeclarationNames {
1165 names: names.iter().map(|name| (*name).to_owned()).collect(),
1166 progress: progress.is_some(),
1167 })?;
1168 self.record_request(false);
1169 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1170 Response::Strings { values } => Ok(values),
1171 other @ (Response::HealthOk
1172 | Response::CapabilityLoaded
1173 | Response::U64 { .. }
1174 | Response::HostSessionOpened
1175 | Response::Elaboration { .. }
1176 | Response::KernelCheck { .. }
1177 | Response::StreamComplete { .. }
1178 | Response::StreamExportFailed { .. }
1179 | Response::StreamCallbackFailed { .. }
1180 | Response::StreamRowMalformed { .. }
1181 | Response::CapabilityMetadata { .. }
1182 | Response::CapabilityDoctor { .. }
1183 | Response::CapabilityMetadataMalformed { .. }
1184 | Response::CapabilityDoctorMalformed { .. }
1185 | Response::JsonCommand { .. }
1186 | Response::RowsComplete { .. }
1187 | Response::Terminating
1188 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1189 }
1190 }
1191
1192 pub(crate) fn worker_run_data_stream(
1193 &mut self,
1194 export: &str,
1195 request: &serde_json::Value,
1196 rows: &dyn LeanWorkerDataSink,
1197 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1198 cancellation: Option<&LeanWorkerCancellationToken>,
1199 progress: Option<&dyn LeanWorkerProgressSink>,
1200 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1201 self.worker_run_data_stream_with_sink(
1202 export,
1203 request,
1204 LeanWorkerDataSinkTarget::Value(rows),
1205 diagnostics,
1206 cancellation,
1207 progress,
1208 )
1209 }
1210
1211 pub(crate) fn worker_run_data_stream_raw(
1212 &mut self,
1213 export: &str,
1214 request: &serde_json::Value,
1215 rows: &dyn LeanWorkerRawDataSink,
1216 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1217 cancellation: Option<&LeanWorkerCancellationToken>,
1218 progress: Option<&dyn LeanWorkerProgressSink>,
1219 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1220 self.worker_run_data_stream_with_sink(
1221 export,
1222 request,
1223 LeanWorkerDataSinkTarget::Raw(rows),
1224 diagnostics,
1225 cancellation,
1226 progress,
1227 )
1228 }
1229
1230 fn worker_run_data_stream_with_sink(
1231 &mut self,
1232 export: &str,
1233 request: &serde_json::Value,
1234 rows: LeanWorkerDataSinkTarget<'_>,
1235 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1236 cancellation: Option<&LeanWorkerCancellationToken>,
1237 progress: Option<&dyn LeanWorkerProgressSink>,
1238 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
1239 const OPERATION: &str = "worker_run_data_stream";
1240 check_cancelled(OPERATION, cancellation)?;
1241 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1242 message: format!("worker data-stream request JSON encode failed: {err}"),
1243 })?;
1244 self.prepare_request(false)?;
1245 self.send_request(Request::RunDataStream {
1246 export: export.to_owned(),
1247 request_json,
1248 progress: progress.is_some(),
1249 })?;
1250 self.record_request(false);
1251 self.stats.stream_requests = self.stats.stream_requests.saturating_add(1);
1252 match self.read_response_with_events(OPERATION, progress, cancellation, Some(rows), diagnostics)? {
1253 Response::StreamComplete { summary } => Ok(summary.into()),
1254 Response::StreamExportFailed { status_byte } => {
1255 Err(LeanWorkerError::StreamExportFailed { status: status_byte })
1256 }
1257 Response::StreamCallbackFailed {
1258 status_byte,
1259 description,
1260 } => Err(LeanWorkerError::StreamCallbackFailed {
1261 status: status_byte,
1262 description,
1263 }),
1264 Response::StreamRowMalformed { message } => Err(LeanWorkerError::StreamRowMalformed { message }),
1265 other @ (Response::HealthOk
1266 | Response::CapabilityLoaded
1267 | Response::U64 { .. }
1268 | Response::HostSessionOpened
1269 | Response::Elaboration { .. }
1270 | Response::KernelCheck { .. }
1271 | Response::Strings { .. }
1272 | Response::RowsComplete { .. }
1273 | Response::CapabilityMetadata { .. }
1274 | Response::CapabilityDoctor { .. }
1275 | Response::CapabilityMetadataMalformed { .. }
1276 | Response::CapabilityDoctorMalformed { .. }
1277 | Response::JsonCommand { .. }
1278 | Response::Terminating
1279 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1280 }
1281 }
1282
1283 pub(crate) fn worker_capability_metadata(
1284 &mut self,
1285 export: &str,
1286 request: &serde_json::Value,
1287 cancellation: Option<&LeanWorkerCancellationToken>,
1288 progress: Option<&dyn LeanWorkerProgressSink>,
1289 ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
1290 const OPERATION: &str = "worker_capability_metadata";
1291 check_cancelled(OPERATION, cancellation)?;
1292 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1293 message: format!("worker capability metadata request JSON encode failed: {err}"),
1294 })?;
1295 self.prepare_request(false)?;
1296 self.send_request(Request::CapabilityMetadata {
1297 export: export.to_owned(),
1298 request_json,
1299 })?;
1300 self.record_request(false);
1301 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1302 Response::CapabilityMetadata { metadata } => Ok(metadata.into()),
1303 Response::CapabilityMetadataMalformed { message } => {
1304 Err(LeanWorkerError::CapabilityMetadataMalformed { message })
1305 }
1306 other @ (Response::HealthOk
1307 | Response::CapabilityLoaded
1308 | Response::U64 { .. }
1309 | Response::HostSessionOpened
1310 | Response::Elaboration { .. }
1311 | Response::KernelCheck { .. }
1312 | Response::Strings { .. }
1313 | Response::StreamComplete { .. }
1314 | Response::StreamExportFailed { .. }
1315 | Response::StreamCallbackFailed { .. }
1316 | Response::StreamRowMalformed { .. }
1317 | Response::CapabilityDoctor { .. }
1318 | Response::CapabilityDoctorMalformed { .. }
1319 | Response::JsonCommand { .. }
1320 | Response::RowsComplete { .. }
1321 | Response::Terminating
1322 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1323 }
1324 }
1325
1326 pub(crate) fn worker_capability_doctor(
1327 &mut self,
1328 export: &str,
1329 request: &serde_json::Value,
1330 cancellation: Option<&LeanWorkerCancellationToken>,
1331 progress: Option<&dyn LeanWorkerProgressSink>,
1332 ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
1333 const OPERATION: &str = "worker_capability_doctor";
1334 check_cancelled(OPERATION, cancellation)?;
1335 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
1336 message: format!("worker capability doctor request JSON encode failed: {err}"),
1337 })?;
1338 self.prepare_request(false)?;
1339 self.send_request(Request::CapabilityDoctor {
1340 export: export.to_owned(),
1341 request_json,
1342 })?;
1343 self.record_request(false);
1344 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1345 Response::CapabilityDoctor { report } => Ok(report.into()),
1346 Response::CapabilityDoctorMalformed { message } => {
1347 Err(LeanWorkerError::CapabilityDoctorMalformed { message })
1348 }
1349 other @ (Response::HealthOk
1350 | Response::CapabilityLoaded
1351 | Response::U64 { .. }
1352 | Response::HostSessionOpened
1353 | Response::Elaboration { .. }
1354 | Response::KernelCheck { .. }
1355 | Response::Strings { .. }
1356 | Response::StreamComplete { .. }
1357 | Response::StreamExportFailed { .. }
1358 | Response::StreamCallbackFailed { .. }
1359 | Response::StreamRowMalformed { .. }
1360 | Response::CapabilityMetadata { .. }
1361 | Response::CapabilityMetadataMalformed { .. }
1362 | Response::JsonCommand { .. }
1363 | Response::RowsComplete { .. }
1364 | Response::Terminating
1365 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1366 }
1367 }
1368
1369 pub(crate) fn worker_json_command(
1370 &mut self,
1371 export: &str,
1372 request_json: String,
1373 cancellation: Option<&LeanWorkerCancellationToken>,
1374 progress: Option<&dyn LeanWorkerProgressSink>,
1375 ) -> Result<String, LeanWorkerError> {
1376 const OPERATION: &str = "worker_json_command";
1377 check_cancelled(OPERATION, cancellation)?;
1378 self.prepare_request(false)?;
1379 self.send_request(Request::JsonCommand {
1380 export: export.to_owned(),
1381 request_json,
1382 })?;
1383 self.record_request(false);
1384 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1385 Response::JsonCommand { response_json } => Ok(response_json),
1386 other @ (Response::HealthOk
1387 | Response::CapabilityLoaded
1388 | Response::U64 { .. }
1389 | Response::HostSessionOpened
1390 | Response::Elaboration { .. }
1391 | Response::KernelCheck { .. }
1392 | Response::Strings { .. }
1393 | Response::StreamComplete { .. }
1394 | Response::StreamExportFailed { .. }
1395 | Response::StreamCallbackFailed { .. }
1396 | Response::StreamRowMalformed { .. }
1397 | Response::CapabilityMetadata { .. }
1398 | Response::CapabilityDoctor { .. }
1399 | Response::CapabilityMetadataMalformed { .. }
1400 | Response::CapabilityDoctorMalformed { .. }
1401 | Response::RowsComplete { .. }
1402 | Response::Terminating
1403 | Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
1404 }
1405 }
1406
1407 fn send_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
1408 self.ensure_running()?;
1409 let Some(stdin) = self.stdin.as_mut() else {
1410 return Err(self.dead_error());
1411 };
1412 write_frame(stdin, Message::Request(request)).map_err(|err| LeanWorkerError::Protocol {
1413 message: err.to_string(),
1414 })
1415 }
1416
1417 fn prepare_request(&mut self, import_like: bool) -> Result<(), LeanWorkerError> {
1418 self.ensure_running()?;
1419
1420 if let Some(limit) = self.config.restart_policy.max_requests
1421 && self.requests_since_restart >= limit
1422 {
1423 return self.restart_with_reason(LeanWorkerRestartReason::MaxRequests { limit });
1424 }
1425
1426 if import_like
1427 && let Some(limit) = self.config.restart_policy.max_imports
1428 && self.imports_since_restart >= limit
1429 {
1430 return self.restart_with_reason(LeanWorkerRestartReason::MaxImports { limit });
1431 }
1432
1433 if let Some(limit_kib) = self.config.restart_policy.max_rss_kib {
1434 match self.child_rss_kib() {
1435 Some(current_kib) if current_kib >= limit_kib => {
1436 self.stats.last_rss_kib = Some(current_kib);
1437 return self.restart_with_reason(LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib });
1438 }
1439 Some(current_kib) => {
1440 self.stats.last_rss_kib = Some(current_kib);
1441 }
1442 None => {
1443 self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
1444 }
1445 }
1446 }
1447
1448 if let Some(limit) = self.config.restart_policy.idle_restart_after {
1449 let idle_for = self.last_activity.elapsed();
1450 if idle_for >= limit {
1451 return self.restart_with_reason(LeanWorkerRestartReason::Idle { idle_for, limit });
1452 }
1453 }
1454
1455 Ok(())
1456 }
1457
1458 fn record_request(&mut self, import_like: bool) {
1459 self.stats.requests = self.stats.requests.saturating_add(1);
1460 self.requests_since_restart = self.requests_since_restart.saturating_add(1);
1461 if import_like {
1462 self.stats.imports = self.stats.imports.saturating_add(1);
1463 self.imports_since_restart = self.imports_since_restart.saturating_add(1);
1464 }
1465 self.last_activity = Instant::now();
1466 }
1467
1468 fn restart_with_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
1469 let config = self.config.clone();
1470 self.stop_existing_child()?;
1471 self.stats.record_restart(reason);
1472 self.requests_since_restart = 0;
1473 self.imports_since_restart = 0;
1474 let mut next = Self::spawn(&config)?;
1475 next.stats = self.stats.clone();
1476 next.last_activity = Instant::now();
1477 *self = next;
1478 Ok(())
1479 }
1480
1481 fn read_response(&mut self, operation: &'static str) -> Result<Response, LeanWorkerError> {
1482 self.read_response_with_events(operation, None, None, None, None)
1483 }
1484
1485 fn read_response_with_progress(
1486 &mut self,
1487 operation: &'static str,
1488 progress: Option<&dyn LeanWorkerProgressSink>,
1489 cancellation: Option<&LeanWorkerCancellationToken>,
1490 ) -> Result<Response, LeanWorkerError> {
1491 self.read_response_with_events(operation, progress, cancellation, None, None)
1492 }
1493
1494 fn read_response_with_events(
1495 &mut self,
1496 operation: &'static str,
1497 progress: Option<&dyn LeanWorkerProgressSink>,
1498 cancellation: Option<&LeanWorkerCancellationToken>,
1499 data: Option<LeanWorkerDataSinkTarget<'_>>,
1500 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
1501 ) -> Result<Response, LeanWorkerError> {
1502 let started = Instant::now();
1503 let timeout = self.config.request_timeout;
1504 let deadline = started.checked_add(timeout);
1505 let streaming = data.is_some();
1506 let mut request_backpressure_waits = 0_u64;
1507 let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
1508 let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
1509 let _reader = thread::spawn(move || read_request_messages(stdout, sender));
1510
1511 loop {
1512 let event = match deadline.and_then(|deadline| deadline.checked_duration_since(Instant::now())) {
1513 Some(remaining) if remaining.is_zero() => {
1514 if streaming {
1515 self.record_stream_failure(started, request_backpressure_waits);
1516 }
1517 self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1518 operation,
1519 duration: timeout,
1520 })?;
1521 return Err(LeanWorkerError::Timeout {
1522 operation,
1523 duration: timeout,
1524 });
1525 }
1526 Some(remaining) => match receiver.recv_timeout(remaining) {
1527 Ok(event) => event,
1528 Err(mpsc::RecvTimeoutError::Timeout) => {
1529 if streaming {
1530 self.record_stream_failure(started, request_backpressure_waits);
1531 }
1532 self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
1533 operation,
1534 duration: timeout,
1535 })?;
1536 return Err(LeanWorkerError::Timeout {
1537 operation,
1538 duration: timeout,
1539 });
1540 }
1541 Err(mpsc::RecvTimeoutError::Disconnected) => {
1542 return Err(LeanWorkerError::Protocol {
1543 message: "worker response reader exited without a terminal response".to_owned(),
1544 });
1545 }
1546 },
1547 None => match receiver.recv() {
1548 Ok(event) => event,
1549 Err(_err) => {
1550 return Err(LeanWorkerError::Protocol {
1551 message: "worker response reader exited without a terminal response".to_owned(),
1552 });
1553 }
1554 },
1555 };
1556 request_backpressure_waits = request_backpressure_waits.saturating_add(event.backpressure_waits());
1557 self.stats.backpressure_waits = self.stats.backpressure_waits.saturating_add(event.backpressure_waits());
1558
1559 let message = match event {
1560 RequestReaderEvent::Message { message, .. } => message,
1561 RequestReaderEvent::Terminal { message, stdout, .. } => {
1562 self.stdout = Some(stdout);
1563 match message {
1564 Message::Response(Response::Error { code, message }) => {
1565 if streaming {
1566 self.record_stream_failure(started, request_backpressure_waits);
1567 }
1568 return Err(LeanWorkerError::Worker { code, message });
1569 }
1570 Message::Response(response) => {
1571 if streaming {
1572 if matches!(response, Response::StreamComplete { .. }) {
1573 self.record_stream_success(started);
1574 } else {
1575 self.record_stream_failure(started, request_backpressure_waits);
1576 }
1577 }
1578 return Ok(response);
1579 }
1580 other @ (Message::Handshake { .. }
1581 | Message::Request(_)
1582 | Message::Diagnostic(_)
1583 | Message::ProgressTick(_)
1584 | Message::DataRow(_)
1585 | Message::FatalExit(_)) => {
1586 return Err(LeanWorkerError::Protocol {
1587 message: format!("worker sent unexpected {operation} message: {other:?}"),
1588 });
1589 }
1590 }
1591 }
1592 RequestReaderEvent::ReadError { message, eof, .. } => {
1593 if streaming {
1594 self.record_stream_failure(started, request_backpressure_waits);
1595 }
1596 return if eof {
1597 Err(self.record_exit_error())
1598 } else {
1599 Err(LeanWorkerError::Protocol { message })
1600 };
1601 }
1602 };
1603
1604 match message {
1605 Message::ProgressTick(tick) => {
1606 if let Err(err) =
1607 report_parent_progress(progress, elapsed_event(tick.phase, tick.current, tick.total, started))
1608 {
1609 if streaming {
1610 self.record_stream_failure(started, request_backpressure_waits);
1611 }
1612 return Err(err);
1613 }
1614 if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1615 if streaming {
1616 self.record_stream_failure(started, request_backpressure_waits);
1617 }
1618 self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1619 return Err(LeanWorkerError::Cancelled { operation });
1620 }
1621 }
1622 Message::DataRow(row) => {
1623 let payload_bytes = row.payload.get().len() as u64;
1624 if let Err(err) = report_parent_data_row(data, row) {
1625 if streaming {
1626 self.record_stream_failure(started, request_backpressure_waits);
1627 }
1628 return Err(err);
1629 }
1630 self.stats.data_rows_delivered = self.stats.data_rows_delivered.saturating_add(1);
1631 self.stats.data_row_payload_bytes = self.stats.data_row_payload_bytes.saturating_add(payload_bytes);
1632 if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1633 if streaming {
1634 self.record_stream_failure(started, request_backpressure_waits);
1635 }
1636 self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
1637 return Err(LeanWorkerError::Cancelled { operation });
1638 }
1639 }
1640 Message::Diagnostic(diagnostic) => {
1641 if let Err(err) = report_parent_diagnostic(diagnostics, diagnostic.into()) {
1642 if streaming {
1643 self.record_stream_failure(started, request_backpressure_waits);
1644 }
1645 return Err(err);
1646 }
1647 }
1648 Message::Response(response) => return Err(unexpected_response(operation, &response)),
1649 other @ (Message::Handshake { .. } | Message::Request(_) | Message::FatalExit(_)) => {
1650 return Err(LeanWorkerError::Protocol {
1651 message: format!("worker sent unexpected {operation} message: {other:?}"),
1652 });
1653 }
1654 }
1655 }
1656 }
1657
1658 fn ensure_running(&mut self) -> Result<(), LeanWorkerError> {
1659 match self.status()? {
1660 LeanWorkerStatus::Running => Ok(()),
1661 LeanWorkerStatus::Exited(exit) if exit.success => Err(LeanWorkerError::ChildExited { exit }),
1662 LeanWorkerStatus::Exited(exit) => Err(LeanWorkerError::ChildPanicOrAbort { exit }),
1663 }
1664 }
1665
1666 fn record_stream_success(&mut self, started: Instant) {
1667 self.stats.stream_successes = self.stats.stream_successes.saturating_add(1);
1668 self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1669 }
1670
1671 fn record_stream_failure(&mut self, started: Instant, backpressure_waits: u64) {
1672 self.stats.stream_failures = self.stats.stream_failures.saturating_add(1);
1673 self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
1674 if backpressure_waits > 0 {
1675 self.stats.backpressure_failures = self.stats.backpressure_failures.saturating_add(1);
1676 }
1677 }
1678
1679 fn wait_for_exit(&mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
1680 let Some(child) = self.child.as_mut() else {
1681 return Err(self.dead_error());
1682 };
1683 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1684 let diagnostics = self.read_stderr();
1685 let exit = LeanWorkerExit::from_status(status, diagnostics);
1686 self.last_exit = Some(exit.clone());
1687 self.child = None;
1688 self.stdin = None;
1689 self.stdout = None;
1690 self.stats.exits = self.stats.exits.saturating_add(1);
1691 Ok(exit)
1692 }
1693
1694 fn try_record_exit(&mut self) -> Option<LeanWorkerExit> {
1695 let child = self.child.as_mut()?;
1696 let status = child.try_wait().ok().flatten()?;
1697 let diagnostics = self.read_stderr();
1698 let exit = LeanWorkerExit::from_status(status, diagnostics);
1699 self.last_exit = Some(exit.clone());
1700 self.child = None;
1701 self.stdin = None;
1702 self.stdout = None;
1703 self.stats.exits = self.stats.exits.saturating_add(1);
1704 Some(exit)
1705 }
1706
1707 fn record_exit_error(&mut self) -> LeanWorkerError {
1708 match self.wait_for_exit() {
1709 Ok(exit) if exit.success => LeanWorkerError::ChildExited { exit },
1710 Ok(exit) => LeanWorkerError::ChildPanicOrAbort { exit },
1711 Err(err) => err,
1712 }
1713 }
1714
1715 fn stop_existing_child(&mut self) -> Result<(), LeanWorkerError> {
1716 if let Some(child) = self.child.as_mut() {
1717 drop(child.kill());
1718 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1719 let diagnostics = self.read_stderr();
1720 self.last_exit = Some(LeanWorkerExit::from_status(status, diagnostics));
1721 self.stats.exits = self.stats.exits.saturating_add(1);
1722 }
1723 self.child = None;
1724 self.stdin = None;
1725 self.stdout = None;
1726 Ok(())
1727 }
1728
1729 fn dead_error(&self) -> LeanWorkerError {
1730 let exit = self.last_exit.clone().unwrap_or_else(|| LeanWorkerExit {
1731 success: false,
1732 code: None,
1733 status: "worker is not running".to_owned(),
1734 diagnostics: String::new(),
1735 });
1736 if exit.success {
1737 LeanWorkerError::ChildExited { exit }
1738 } else {
1739 LeanWorkerError::ChildPanicOrAbort { exit }
1740 }
1741 }
1742
1743 fn read_stderr(&mut self) -> String {
1744 let mut diagnostics = String::new();
1745 if let Some(mut pipe) = self.stderr.take() {
1746 drop(pipe.read_to_string(&mut diagnostics));
1747 }
1748 diagnostics
1749 }
1750
1751 fn child_rss_kib(&mut self) -> Option<u64> {
1752 let child = self.child.as_mut()?;
1753 child_rss_kib(child.id())
1754 }
1755}
1756
1757enum RequestReaderEvent {
1758 Message {
1759 message: Message,
1760 backpressure_waits: u64,
1761 },
1762 Terminal {
1763 message: Message,
1764 stdout: BufReader<ChildStdout>,
1765 backpressure_waits: u64,
1766 },
1767 ReadError {
1768 message: String,
1769 eof: bool,
1770 backpressure_waits: u64,
1771 },
1772}
1773
1774impl RequestReaderEvent {
1775 fn backpressure_waits(&self) -> u64 {
1776 match self {
1777 Self::Message { backpressure_waits, .. }
1778 | Self::Terminal { backpressure_waits, .. }
1779 | Self::ReadError { backpressure_waits, .. } => *backpressure_waits,
1780 }
1781 }
1782
1783 fn add_backpressure_wait(&mut self) {
1784 match self {
1785 Self::Message { backpressure_waits, .. }
1786 | Self::Terminal { backpressure_waits, .. }
1787 | Self::ReadError { backpressure_waits, .. } => {
1788 *backpressure_waits = backpressure_waits.saturating_add(1);
1789 }
1790 }
1791 }
1792}
1793
1794#[allow(
1795 clippy::needless_pass_by_value,
1796 reason = "the request reader thread must own the sender"
1797)]
1798fn read_request_messages(mut stdout: BufReader<ChildStdout>, sender: mpsc::SyncSender<RequestReaderEvent>) {
1799 loop {
1800 match read_frame(&mut stdout) {
1801 Ok(frame) if matches!(frame.message, Message::Response(_)) => {
1802 let _ = send_reader_event(
1803 &sender,
1804 RequestReaderEvent::Terminal {
1805 message: frame.message,
1806 stdout,
1807 backpressure_waits: 0,
1808 },
1809 );
1810 return;
1811 }
1812 Ok(frame) => {
1813 if send_reader_event(
1814 &sender,
1815 RequestReaderEvent::Message {
1816 message: frame.message,
1817 backpressure_waits: 0,
1818 },
1819 )
1820 .is_err()
1821 {
1822 return;
1823 }
1824 }
1825 Err(err) => {
1826 let _ = send_reader_event(
1827 &sender,
1828 RequestReaderEvent::ReadError {
1829 message: err.to_string(),
1830 eof: err.is_eof(),
1831 backpressure_waits: 0,
1832 },
1833 );
1834 return;
1835 }
1836 }
1837 }
1838}
1839
1840fn send_reader_event(sender: &mpsc::SyncSender<RequestReaderEvent>, event: RequestReaderEvent) -> Result<(), ()> {
1841 match sender.try_send(event) {
1842 Ok(()) => Ok(()),
1843 Err(mpsc::TrySendError::Full(mut event)) => {
1844 event.add_backpressure_wait();
1845 sender.send(event).map_err(|_| ())
1846 }
1847 Err(mpsc::TrySendError::Disconnected(_event)) => Err(()),
1848 }
1849}
1850
1851impl Drop for LeanWorker {
1852 fn drop(&mut self) {
1853 if let Some(child) = self.child.as_mut() {
1854 drop(child.kill());
1855 drop(child.wait());
1856 }
1857 }
1858}
1859
1860fn expect_handshake(stdout: &mut BufReader<ChildStdout>) -> Result<LeanWorkerRuntimeMetadata, LeanWorkerError> {
1861 let frame = read_frame(stdout).map_err(|err| {
1862 if err.is_eof() {
1863 LeanWorkerError::Handshake {
1864 message: "child closed stdout before handshake".to_owned(),
1865 }
1866 } else {
1867 LeanWorkerError::Handshake {
1868 message: err.to_string(),
1869 }
1870 }
1871 })?;
1872 match frame.message {
1873 Message::Handshake {
1874 worker_version,
1875 protocol_version,
1876 } if protocol_version == crate::protocol::PROTOCOL_VERSION => Ok(LeanWorkerRuntimeMetadata {
1877 worker_version,
1878 protocol_version,
1879 lean_version: None,
1880 }),
1881 other @ (Message::Handshake { .. }
1882 | Message::Request(_)
1883 | Message::Response(_)
1884 | Message::Diagnostic(_)
1885 | Message::ProgressTick(_)
1886 | Message::DataRow(_)
1887 | Message::FatalExit(_)) => Err(LeanWorkerError::Handshake {
1888 message: format!("unexpected handshake frame: {other:?}"),
1889 }),
1890 }
1891}
1892
1893fn wait_with_stderr(child: &mut Child, stderr: Option<ChildStderr>) -> Result<LeanWorkerExit, LeanWorkerError> {
1894 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
1895 let mut diagnostics = String::new();
1896 if let Some(mut pipe) = stderr {
1897 drop(pipe.read_to_string(&mut diagnostics));
1898 }
1899 Ok(LeanWorkerExit::from_status(status, diagnostics))
1900}
1901
1902fn unexpected_response(operation: &'static str, response: &Response) -> LeanWorkerError {
1903 LeanWorkerError::Protocol {
1904 message: format!("worker sent unexpected {operation} response: {response:?}"),
1905 }
1906}
1907
1908fn path_string(path: &Path) -> String {
1909 path.to_string_lossy().into_owned()
1910}
1911
1912#[cfg(target_os = "linux")]
1913fn child_rss_kib(pid: u32) -> Option<u64> {
1914 let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
1915 status.lines().find_map(|line| {
1916 let rest = line.strip_prefix("VmRSS:")?;
1917 rest.split_whitespace().next()?.parse::<u64>().ok()
1918 })
1919}
1920
1921#[cfg(not(target_os = "linux"))]
1922fn child_rss_kib(pid: u32) -> Option<u64> {
1923 let output = Command::new("ps")
1924 .args(["-o", "rss=", "-p", &pid.to_string()])
1925 .output()
1926 .ok()?;
1927 if !output.status.success() {
1928 return None;
1929 }
1930 let text = String::from_utf8_lossy(&output.stdout);
1931 text.trim().parse::<u64>().ok().filter(|value| *value > 0)
1932}