1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::fmt;
4use std::io::{BufReader, BufWriter, Read as _};
5use std::path::{Path, PathBuf};
6use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
7use std::sync::Mutex;
8use std::sync::mpsc;
9use std::thread;
10use std::time::{Duration, Instant};
11
12use lean_rs_worker_protocol::protocol::{
13 HostSessionMode, MAX_FRAME_BYTES, MAX_FRAME_BYTES_HARD_CAP, MIN_FRAME_BYTES, Message, Request, Response,
14 read_frame, write_frame,
15};
16use lean_rs_worker_protocol::types::{
17 LeanWorkerCapabilityMetadata, LeanWorkerDeclarationFilter, LeanWorkerDeclarationInspectionRequest,
18 LeanWorkerDeclarationInspectionResult, LeanWorkerDeclarationRow, LeanWorkerDeclarationSearch,
19 LeanWorkerDeclarationSearchResult, LeanWorkerDeclarationType, LeanWorkerDeclarationVerificationBatchRequest,
20 LeanWorkerDeclarationVerificationBatchResult, LeanWorkerDeclarationVerificationBatchRow,
21 LeanWorkerDeclarationVerificationFacts, LeanWorkerDeclarationVerificationRequest,
22 LeanWorkerDeclarationVerificationResult, LeanWorkerDeclarationVerificationStatus, LeanWorkerDoctorReport,
23 LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerImportStats, LeanWorkerKernelResult, LeanWorkerMetaResult,
24 LeanWorkerMetaTransparency, LeanWorkerModuleQuery, LeanWorkerModuleQueryBatchEnvelope,
25 LeanWorkerModuleQueryBatchItem, LeanWorkerModuleQueryBatchOutcome, LeanWorkerModuleQueryCacheFacts,
26 LeanWorkerModuleQueryOutcome, LeanWorkerModuleQuerySelector, LeanWorkerModuleSnapshotCacheClearResult,
27 LeanWorkerOutputBudgets, LeanWorkerProofAttemptRequest, LeanWorkerProofAttemptResult, LeanWorkerRendered,
28 LeanWorkerResourceExhaustedFacts,
29};
30use lean_rs_worker_protocol::worker_exports::{fixture_mul_signature, fixture_panic_signature};
31
32use crate::capability::LeanWorkerBootstrapDiagnosticCode;
33use crate::session::LeanWorkerDataSinkTarget;
34use crate::session::{
35 LeanWorkerCancellationToken, LeanWorkerDataSink, LeanWorkerDiagnosticSink, LeanWorkerProgressSink,
36 LeanWorkerRawDataRow, LeanWorkerRawDataSink, LeanWorkerRuntimeMetadata, LeanWorkerSessionConfig,
37 LeanWorkerSessionMode, LeanWorkerStreamSummary, check_cancelled, elapsed_event, report_parent_data_row,
38 report_parent_diagnostic, report_parent_progress,
39};
40
41const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
42const WORKER_EVENT_BUFFER_CAPACITY: usize = 64;
43const DEFAULT_RESTART_INTENSITY_LIMIT: u64 = 16;
44const DEFAULT_RESTART_INTENSITY_WINDOW: Duration = Duration::from_mins(1);
45
46pub const LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(30);
48
49pub const LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING: Duration = Duration::from_mins(10);
51
52pub const LEAN_WORKER_SHUTDOWN_TIMEOUT_DEFAULT: Duration = Duration::from_secs(2);
54
55pub const LEAN_WORKER_KILL_WAIT_TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);
57
58#[derive(Clone, Debug)]
80pub struct LeanWorkerConfig {
81 executable: PathBuf,
82 current_dir: Option<PathBuf>,
83 env: Vec<(OsString, OsString)>,
84 startup_timeout: Duration,
85 request_timeout: Duration,
86 shutdown_timeout: Duration,
87 restart_policy: LeanWorkerRestartPolicy,
88 rss_hard_limit_kib: Option<u64>,
89 rss_sample_interval: Duration,
90 max_frame_bytes: u32,
91}
92
93impl LeanWorkerConfig {
94 pub fn new(executable: impl Into<PathBuf>) -> Self {
96 Self {
97 executable: executable.into(),
98 current_dir: None,
99 env: Vec::new(),
100 startup_timeout: DEFAULT_STARTUP_TIMEOUT,
101 request_timeout: LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT,
102 shutdown_timeout: LEAN_WORKER_SHUTDOWN_TIMEOUT_DEFAULT,
103 restart_policy: LeanWorkerRestartPolicy::default(),
104 rss_hard_limit_kib: None,
105 rss_sample_interval: Duration::from_millis(250),
106 max_frame_bytes: MAX_FRAME_BYTES,
107 }
108 }
109
110 pub fn executable(&self) -> &Path {
112 &self.executable
113 }
114
115 #[must_use]
117 pub fn current_dir(mut self, path: impl Into<PathBuf>) -> Self {
118 self.current_dir = Some(path.into());
119 self
120 }
121
122 #[must_use]
124 pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
125 self.env.push((key.into(), value.into()));
126 self
127 }
128
129 #[cfg(test)]
130 pub(crate) fn env_overrides(&self) -> &[(OsString, OsString)] {
131 &self.env
132 }
133
134 #[must_use]
136 pub fn startup_timeout(mut self, timeout: Duration) -> Self {
137 self.startup_timeout = timeout;
138 self
139 }
140
141 #[must_use]
147 pub fn request_timeout(mut self, timeout: Duration) -> Self {
148 self.request_timeout = timeout;
149 self
150 }
151
152 #[must_use]
158 pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
159 self.shutdown_timeout = timeout;
160 self
161 }
162
163 #[must_use]
165 pub fn long_running_requests(mut self) -> Self {
166 self.request_timeout = LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING;
167 self
168 }
169
170 #[must_use]
176 pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
177 self.restart_policy = policy;
178 self
179 }
180
181 #[must_use]
188 pub fn rss_hard_limit(mut self, limit_kib: u64, sample_interval: Duration) -> Self {
189 self.rss_hard_limit_kib = Some(limit_kib.max(1));
190 self.rss_sample_interval = sample_interval.max(Duration::from_millis(1));
191 self
192 }
193
194 #[must_use]
209 pub fn max_frame_bytes(mut self, max_frame_bytes: u32) -> Self {
210 self.max_frame_bytes = max_frame_bytes.clamp(MIN_FRAME_BYTES, MAX_FRAME_BYTES_HARD_CAP);
211 self
212 }
213}
214
215#[derive(Clone, Debug, Default, Eq, PartialEq)]
222pub struct LeanWorkerRestartPolicy {
223 max_requests: Option<u64>,
224 max_imports: Option<u64>,
225 max_rss_kib: Option<u64>,
226 idle_restart_after: Option<Duration>,
227 restart_intensity: RestartIntensityLimit,
228}
229
230#[derive(Clone, Copy, Debug, Eq, PartialEq)]
231struct RestartIntensityLimit {
232 max_restarts: u64,
233 window: Duration,
234}
235
236impl Default for RestartIntensityLimit {
237 fn default() -> Self {
238 Self {
239 max_restarts: DEFAULT_RESTART_INTENSITY_LIMIT,
240 window: DEFAULT_RESTART_INTENSITY_WINDOW,
241 }
242 }
243}
244
245impl LeanWorkerRestartPolicy {
246 #[must_use]
254 pub fn disabled() -> Self {
255 Self::default()
256 }
257
258 #[must_use]
272 pub fn memory_bounded(max_imports: u64, max_rss_kib: u64) -> Self {
273 Self::default().max_imports(max_imports).max_rss_kib(max_rss_kib)
274 }
275
276 #[must_use]
278 pub fn max_requests(mut self, limit: u64) -> Self {
279 self.max_requests = Some(limit.max(1));
280 self
281 }
282
283 #[must_use]
285 pub fn max_imports(mut self, limit: u64) -> Self {
286 self.max_imports = Some(limit.max(1));
287 self
288 }
289
290 #[must_use]
296 pub fn max_rss_kib(mut self, limit: u64) -> Self {
297 self.max_rss_kib = Some(limit.max(1));
298 self
299 }
300
301 #[must_use]
303 pub fn idle_restart_after(mut self, duration: Duration) -> Self {
304 self.idle_restart_after = Some(duration);
305 self
306 }
307
308 #[must_use]
316 pub fn max_restarts_per_window(mut self, max_restarts: u64, window: Duration) -> Self {
317 self.restart_intensity = RestartIntensityLimit {
318 max_restarts: max_restarts.max(1),
319 window: window.max(Duration::from_millis(1)),
320 };
321 self
322 }
323}
324
325#[derive(Clone, Debug, Eq, PartialEq)]
327pub enum LeanWorkerRestartReason {
328 Explicit,
330 MaxRequests { limit: u64 },
332 MaxImports { limit: u64 },
334 RssCeiling {
336 current_kib: u64,
337 limit_kib: u64,
338 last_import_stats: Option<LeanWorkerImportStats>,
339 },
340 RssHardLimit {
342 operation: &'static str,
343 current_kib: u64,
344 limit_kib: u64,
345 last_import_stats: Option<LeanWorkerImportStats>,
346 },
347 Idle { idle_for: Duration, limit: Duration },
349 Cancelled { operation: &'static str },
351 RequestTimeout {
353 operation: &'static str,
354 duration: Duration,
355 },
356 ChildAbort { operation: &'static str },
361}
362
363impl LeanWorkerRestartReason {
364 #[must_use]
370 pub const fn stable_cause(&self) -> &'static str {
371 match self {
372 Self::Explicit => "explicit",
373 Self::MaxRequests { .. } => "max_requests",
374 Self::MaxImports { .. } => "max_imports",
375 Self::RssCeiling { .. } => "rss_ceiling",
376 Self::RssHardLimit { .. } => "rss_hard_limit",
377 Self::Idle { .. } => "idle",
378 Self::Cancelled { .. } => "cancelled",
379 Self::RequestTimeout { .. } => "timeout",
380 Self::ChildAbort { .. } => "child_abort",
381 }
382 }
383}
384
385#[derive(Clone, Debug, Default, Eq, PartialEq)]
391pub struct LeanWorkerReplacementTiming {
392 pub spawn_handshake: Duration,
393 pub capability_load: Duration,
394 pub session_open_import: Duration,
395 pub first_command: Option<Duration>,
396 pub warm_command: Option<Duration>,
397 pub replacement_total: Duration,
398 pub replacement_reason: String,
399 pub replacement_budget_status: String,
400}
401
402#[derive(Clone, Debug, Default, Eq, PartialEq)]
404pub struct LeanWorkerStats {
405 pub requests: u64,
407 pub imports: u64,
409 pub import_like_admission_attempts: u64,
411 pub import_like_admitted: u64,
413 pub last_import_like_rss_before_admission_kib: Option<u64>,
415 pub exits: u64,
417 pub restarts: u64,
419 pub explicit_cycles: u64,
421 pub max_request_restarts: u64,
423 pub max_import_restarts: u64,
425 pub rss_restarts: u64,
427 pub idle_restarts: u64,
429 pub cancelled_restarts: u64,
431 pub timeout_restarts: u64,
433 pub rss_samples_unavailable: u64,
435 pub last_rss_kib: Option<u64>,
437 pub last_restart_reason: Option<LeanWorkerRestartReason>,
439 pub replacement_attempts: u64,
441 pub replacement_successes: u64,
443 pub replacement_failures: u64,
445 pub replacement_budget_admitted: u64,
447 pub replacement_budget_skipped: u64,
449 pub last_replacement_timing: Option<LeanWorkerReplacementTiming>,
451 pub last_replacement_skipped_reason: Option<String>,
453 pub last_spawn_handshake_elapsed: Option<Duration>,
455 pub last_capability_load_elapsed: Option<Duration>,
457 pub last_session_open_import_elapsed: Option<Duration>,
459 pub last_first_command_elapsed: Option<Duration>,
461 pub last_warm_command_elapsed: Option<Duration>,
463 pub last_import_stats: Option<LeanWorkerImportStats>,
465 pub stream_requests: u64,
467 pub stream_successes: u64,
469 pub stream_failures: u64,
471 pub data_rows_delivered: u64,
473 pub data_row_payload_bytes: u64,
475 pub stream_elapsed: Duration,
477 pub backpressure_waits: u64,
479 pub backpressure_failures: u64,
481}
482
483#[derive(Clone, Debug, Eq, PartialEq)]
491pub struct LeanWorkerLifecycleSnapshot {
492 pub worker_generation: u64,
495 pub restarts: u64,
497 pub exits: u64,
499 pub last_restart_reason: Option<LeanWorkerRestartReason>,
501 pub last_exit: Option<LeanWorkerExit>,
503 pub last_rss_kib: Option<u64>,
505 pub rss_samples_unavailable: u64,
507}
508
509impl LeanWorkerLifecycleSnapshot {
510 fn from_worker(stats: &LeanWorkerStats, last_exit: Option<LeanWorkerExit>) -> Self {
511 Self {
512 worker_generation: stats.restarts,
513 restarts: stats.restarts,
514 exits: stats.exits,
515 last_restart_reason: stats.last_restart_reason.clone(),
516 last_exit,
517 last_rss_kib: stats.last_rss_kib,
518 rss_samples_unavailable: stats.rss_samples_unavailable,
519 }
520 }
521}
522
523impl LeanWorkerStats {
524 fn record_restart(&mut self, reason: LeanWorkerRestartReason) {
525 self.restarts = self.restarts.saturating_add(1);
526 match &reason {
527 LeanWorkerRestartReason::Explicit => {
528 self.explicit_cycles = self.explicit_cycles.saturating_add(1);
529 }
530 LeanWorkerRestartReason::MaxRequests { .. } => {
531 self.max_request_restarts = self.max_request_restarts.saturating_add(1);
532 }
533 LeanWorkerRestartReason::MaxImports { .. } => {
534 self.max_import_restarts = self.max_import_restarts.saturating_add(1);
535 }
536 LeanWorkerRestartReason::RssCeiling { .. } => {
537 self.rss_restarts = self.rss_restarts.saturating_add(1);
538 }
539 LeanWorkerRestartReason::RssHardLimit { .. } => {
540 self.rss_restarts = self.rss_restarts.saturating_add(1);
541 }
542 LeanWorkerRestartReason::Idle { .. } => {
543 self.idle_restarts = self.idle_restarts.saturating_add(1);
544 }
545 LeanWorkerRestartReason::Cancelled { .. } => {
546 self.cancelled_restarts = self.cancelled_restarts.saturating_add(1);
547 }
548 LeanWorkerRestartReason::RequestTimeout { .. } => {
549 self.timeout_restarts = self.timeout_restarts.saturating_add(1);
550 }
551 LeanWorkerRestartReason::ChildAbort { .. } => {}
555 }
556 self.last_restart_reason = Some(reason);
557 }
558}
559
560#[derive(Clone, Debug, Eq, PartialEq)]
562pub enum LeanWorkerStatus {
563 Running,
565 Exited(LeanWorkerExit),
567}
568
569#[derive(Clone, Debug, Eq, PartialEq)]
571pub struct LeanWorkerExit {
572 pub success: bool,
574 pub code: Option<i32>,
576 pub status: String,
578 pub diagnostics: String,
580}
581
582impl LeanWorkerExit {
583 fn from_status(status: ExitStatus, diagnostics: String) -> Self {
584 Self {
585 success: status.success(),
586 code: status.code(),
587 status: status.to_string(),
588 diagnostics,
589 }
590 }
591}
592
593#[derive(Clone, Debug, Eq, PartialEq)]
595pub struct LeanWorkerShutdownReport {
596 pub outcome: LeanWorkerShutdownOutcome,
598 pub exit: LeanWorkerExit,
600 pub graceful_timeout: Duration,
602 pub elapsed: Duration,
604 pub kill_elapsed: Option<Duration>,
606 pub wait_elapsed: Duration,
608}
609
610#[derive(Clone, Debug, Eq, PartialEq)]
612pub enum LeanWorkerShutdownOutcome {
613 AlreadyExited,
615 Graceful,
617 GracefulTimedOutKilled,
619 GracefulProtocolFailedKilled,
621 KillOnly,
623}
624
625#[derive(Debug)]
627pub enum LeanWorkerError {
628 Spawn {
630 executable: PathBuf,
631 source: std::io::Error,
632 },
633 WorkerChildUnresolved {
635 tried: Vec<PathBuf>,
637 },
638 WorkerChildNotExecutable { path: PathBuf, reason: String },
640 Bootstrap {
642 code: LeanWorkerBootstrapDiagnosticCode,
643 message: String,
644 },
645 CapabilityBuild {
647 diagnostic: lean_toolchain::LinkDiagnostics,
649 },
650 Setup { message: String },
652 Handshake { message: String },
654 Protocol { message: String },
656 Worker { code: String, message: String },
658 ChildExited { exit: LeanWorkerExit },
660 ChildPanicOrAbort { exit: LeanWorkerExit },
662 Timeout {
664 operation: &'static str,
665 duration: Duration,
666 resource: Box<LeanWorkerResourceExhaustedFacts>,
667 },
668 RssHardLimitExceeded {
671 operation: &'static str,
672 current_kib: u64,
673 limit_kib: u64,
674 last_import_stats: Option<Box<LeanWorkerImportStats>>,
675 resource: Box<LeanWorkerResourceExhaustedFacts>,
676 },
677 Cancelled {
679 operation: &'static str,
680 resource: Box<LeanWorkerResourceExhaustedFacts>,
681 },
682 ProgressPanic { message: String },
684 DataSinkPanic { message: String },
686 DiagnosticSinkPanic { message: String },
688 StreamExportFailed { status: u8 },
690 StreamCallbackFailed { status: u8, description: String },
692 StreamRowMalformed { message: String },
694 CapabilityMetadataMalformed { message: String },
696 CapabilityMetadataMismatch {
698 export: String,
699 expected: Box<LeanWorkerCapabilityMetadata>,
700 actual: Box<LeanWorkerCapabilityMetadata>,
701 },
702 CapabilityDoctorMalformed { message: String },
704 TypedCommandRequestEncode { export: String, message: String },
706 TypedCommandResponseDecode { export: String, message: String },
708 TypedCommandRowDecode {
710 export: String,
711 stream: String,
712 sequence: u64,
713 message: String,
714 },
715 TypedCommandSummaryDecode { export: String, message: String },
717 LeaseInvalidated { reason: String },
719 WorkerPoolExhausted {
721 max_workers: usize,
722 resource: Box<LeanWorkerResourceExhaustedFacts>,
723 },
724 WorkerPoolMemoryBudgetExceeded {
726 current_kib: u64,
727 limit_kib: u64,
728 last_import_stats: Option<Box<LeanWorkerImportStats>>,
729 resource: Box<LeanWorkerResourceExhaustedFacts>,
730 },
731 WorkerPoolQueueTimeout {
733 waited: Duration,
734 resource: Box<LeanWorkerResourceExhaustedFacts>,
735 },
736 RestartLimitExceeded { restarts: u64, window: Duration },
738 UnsupportedRequest { operation: &'static str },
740 Wait { source: std::io::Error },
742 Kill { source: std::io::Error },
744 WaitTimeout {
746 operation: &'static str,
747 duration: Duration,
748 },
749 ShutdownInProgress { operation: &'static str },
751}
752
753impl fmt::Display for LeanWorkerError {
754 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
755 match self {
756 Self::Spawn { executable, source } => {
757 write!(f, "failed to spawn worker {}: {source}", executable.display())
758 }
759 Self::WorkerChildUnresolved { tried } => {
760 let tried = tried
761 .iter()
762 .map(|path| path.display().to_string())
763 .collect::<Vec<_>>()
764 .join(", ");
765 write!(
766 f,
767 "could not resolve lean-rs-worker-child; set LEAN_RS_WORKER_CHILD or place it beside the current executable (tried: {tried})"
768 )
769 }
770 Self::WorkerChildNotExecutable { path, reason } => {
771 write!(f, "worker child '{}' is not executable: {reason}", path.display())
772 }
773 Self::Bootstrap { code, message } => {
774 write!(f, "worker bootstrap check {code} failed: {message}")
775 }
776 Self::CapabilityBuild { diagnostic } => {
777 write!(f, "worker capability Lake target build failed: {diagnostic}")
778 }
779 Self::Setup { message } => write!(f, "worker child setup failed: {message}"),
780 Self::Handshake { message } => write!(f, "worker handshake failed: {message}"),
781 Self::Protocol { message } => write!(f, "worker protocol failed: {message}"),
782 Self::Worker { code, message } => write!(f, "worker returned {code}: {message}"),
783 Self::ChildExited { exit } => write_exit(f, "worker exited", exit),
784 Self::ChildPanicOrAbort { exit } => write_exit(f, "worker exited fatally", exit),
785 Self::Timeout {
786 operation, duration, ..
787 } => {
788 write!(f, "worker operation {operation} timed out after {duration:?}")
789 }
790 Self::RssHardLimitExceeded {
791 operation,
792 current_kib,
793 limit_kib,
794 last_import_stats,
795 ..
796 } => {
797 write!(
798 f,
799 "worker operation {operation} exceeded hard RSS limit; current_kib={current_kib} limit_kib={limit_kib}; {}",
800 import_stats_diagnostic(last_import_stats.as_deref())
801 )
802 }
803 Self::Cancelled { operation, .. } => write!(f, "worker operation {operation} was cancelled"),
804 Self::ProgressPanic { message } => write!(f, "worker progress sink panicked: {message}"),
805 Self::DataSinkPanic { message } => write!(f, "worker data sink panicked: {message}"),
806 Self::DiagnosticSinkPanic { message } => {
807 write!(f, "worker diagnostic sink panicked: {message}")
808 }
809 Self::StreamExportFailed { status } => write!(f, "streaming export returned status {status}"),
810 Self::StreamCallbackFailed { status, description } => {
811 write!(f, "streaming callback failed with status {status}: {description}")
812 }
813 Self::StreamRowMalformed { message } => write!(f, "streaming export emitted malformed row: {message}"),
814 Self::CapabilityMetadataMalformed { message } => {
815 write!(f, "capability metadata export returned malformed JSON: {message}")
816 }
817 Self::CapabilityMetadataMismatch { export, .. } => {
818 write!(f, "capability metadata from {export} did not match expectation")
819 }
820 Self::CapabilityDoctorMalformed { message } => {
821 write!(f, "capability doctor export returned malformed JSON: {message}")
822 }
823 Self::TypedCommandRequestEncode { export, message } => {
824 write!(f, "typed worker command {export} request JSON encode failed: {message}")
825 }
826 Self::TypedCommandResponseDecode { export, message } => {
827 write!(
828 f,
829 "typed worker command {export} response JSON decode failed: {message}"
830 )
831 }
832 Self::TypedCommandRowDecode {
833 export,
834 stream,
835 sequence,
836 message,
837 } => {
838 write!(
839 f,
840 "typed worker command {export} row decode failed at stream {stream} sequence {sequence}: {message}"
841 )
842 }
843 Self::TypedCommandSummaryDecode { export, message } => {
844 write!(
845 f,
846 "typed worker command {export} terminal summary decode failed: {message}"
847 )
848 }
849 Self::LeaseInvalidated { reason } => write!(f, "worker pool lease was invalidated: {reason}"),
850 Self::WorkerPoolExhausted { max_workers, .. } => {
851 write!(
852 f,
853 "worker pool cannot admit another session key; max_workers={max_workers}"
854 )
855 }
856 Self::WorkerPoolMemoryBudgetExceeded {
857 current_kib,
858 limit_kib,
859 last_import_stats,
860 ..
861 } => {
862 write!(
863 f,
864 "worker pool cannot admit work within RSS budget; current_kib={current_kib} limit_kib={limit_kib}; {}",
865 import_stats_diagnostic(last_import_stats.as_deref())
866 )
867 }
868 Self::WorkerPoolQueueTimeout { waited, .. } => {
869 write!(f, "worker pool admission timed out after {waited:?}")
870 }
871 Self::RestartLimitExceeded { restarts, window } => {
872 write!(
873 f,
874 "worker restart limit exceeded after {restarts} restarts in {window:?}"
875 )
876 }
877 Self::UnsupportedRequest { operation } => {
878 write!(f, "worker operation {operation} is not supported")
879 }
880 Self::Wait { source } => write!(f, "failed to wait for worker child: {source}"),
881 Self::Kill { source } => write!(f, "failed to kill worker child: {source}"),
882 Self::WaitTimeout { operation, duration } => {
883 write!(
884 f,
885 "timed out waiting for worker child during {operation} after {duration:?}"
886 )
887 }
888 Self::ShutdownInProgress { operation } => {
889 write!(
890 f,
891 "worker operation {operation} was rejected because shutdown is in progress"
892 )
893 }
894 }
895 }
896}
897
898impl std::error::Error for LeanWorkerError {
899 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
900 match self {
901 Self::Spawn { source, .. } | Self::Wait { source } | Self::Kill { source } => Some(source),
902 Self::CapabilityBuild { diagnostic } => Some(diagnostic),
903 Self::WorkerChildUnresolved { .. } | Self::WorkerChildNotExecutable { .. } | Self::Bootstrap { .. } => None,
904 Self::Setup { .. }
905 | Self::Handshake { .. }
906 | Self::Protocol { .. }
907 | Self::Worker { .. }
908 | Self::ChildExited { .. }
909 | Self::ChildPanicOrAbort { .. }
910 | Self::Timeout { .. }
911 | Self::RssHardLimitExceeded { .. }
912 | Self::Cancelled { .. }
913 | Self::ProgressPanic { .. }
914 | Self::DataSinkPanic { .. }
915 | Self::DiagnosticSinkPanic { .. }
916 | Self::StreamExportFailed { .. }
917 | Self::StreamCallbackFailed { .. }
918 | Self::StreamRowMalformed { .. }
919 | Self::CapabilityMetadataMalformed { .. }
920 | Self::CapabilityMetadataMismatch { .. }
921 | Self::CapabilityDoctorMalformed { .. }
922 | Self::TypedCommandRequestEncode { .. }
923 | Self::TypedCommandResponseDecode { .. }
924 | Self::TypedCommandRowDecode { .. }
925 | Self::TypedCommandSummaryDecode { .. }
926 | Self::LeaseInvalidated { .. }
927 | Self::WorkerPoolExhausted { .. }
928 | Self::WorkerPoolMemoryBudgetExceeded { .. }
929 | Self::WorkerPoolQueueTimeout { .. }
930 | Self::RestartLimitExceeded { .. }
931 | Self::UnsupportedRequest { .. }
932 | Self::WaitTimeout { .. }
933 | Self::ShutdownInProgress { .. } => None,
934 }
935 }
936}
937
938impl LeanWorkerError {
939 #[must_use]
943 pub fn resource_exhausted_facts(&self) -> Option<&LeanWorkerResourceExhaustedFacts> {
944 match self {
945 Self::Timeout { resource, .. }
946 | Self::RssHardLimitExceeded { resource, .. }
947 | Self::Cancelled { resource, .. }
948 | Self::WorkerPoolExhausted { resource, .. }
949 | Self::WorkerPoolMemoryBudgetExceeded { resource, .. }
950 | Self::WorkerPoolQueueTimeout { resource, .. } => Some(resource.as_ref()),
951 Self::Spawn { .. }
952 | Self::Kill { .. }
953 | Self::WorkerChildUnresolved { .. }
954 | Self::WorkerChildNotExecutable { .. }
955 | Self::Bootstrap { .. }
956 | Self::CapabilityBuild { .. }
957 | Self::Setup { .. }
958 | Self::Handshake { .. }
959 | Self::Protocol { .. }
960 | Self::Worker { .. }
961 | Self::ChildExited { .. }
962 | Self::ChildPanicOrAbort { .. }
963 | Self::ProgressPanic { .. }
964 | Self::DataSinkPanic { .. }
965 | Self::DiagnosticSinkPanic { .. }
966 | Self::StreamExportFailed { .. }
967 | Self::StreamCallbackFailed { .. }
968 | Self::StreamRowMalformed { .. }
969 | Self::CapabilityMetadataMalformed { .. }
970 | Self::CapabilityMetadataMismatch { .. }
971 | Self::CapabilityDoctorMalformed { .. }
972 | Self::TypedCommandRequestEncode { .. }
973 | Self::TypedCommandResponseDecode { .. }
974 | Self::TypedCommandRowDecode { .. }
975 | Self::TypedCommandSummaryDecode { .. }
976 | Self::LeaseInvalidated { .. }
977 | Self::RestartLimitExceeded { .. }
978 | Self::UnsupportedRequest { .. }
979 | Self::WaitTimeout { .. }
980 | Self::ShutdownInProgress { .. }
981 | Self::Wait { .. } => None,
982 }
983 }
984}
985
986fn duration_ms(duration: Duration) -> u64 {
987 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
988}
989
990#[allow(
991 clippy::too_many_arguments,
992 reason = "flat fact construction mirrors the public diagnostic payload"
993)]
994fn resource_facts(
995 cause: impl Into<String>,
996 work_entered_child: bool,
997 operation: Option<&'static str>,
998 current_rss_kib: Option<u64>,
999 limit_kib: Option<u64>,
1000 import_count: Option<u64>,
1001 worker_generation: Option<u64>,
1002 restart_reason: Option<String>,
1003 queue_wait: Option<Duration>,
1004 duration: Option<Duration>,
1005 cold_open_attempts: Option<u64>,
1006 cold_open_admitted: Option<u64>,
1007 cold_open_refusals: Option<u64>,
1008 import_like_requests: Option<u64>,
1009 import_like_admitted: Option<u64>,
1010 last_import_stats: Option<LeanWorkerImportStats>,
1011) -> LeanWorkerResourceExhaustedFacts {
1012 LeanWorkerResourceExhaustedFacts {
1013 cause: cause.into(),
1014 work_entered_child,
1015 operation: operation.map(str::to_owned),
1016 current_rss_kib,
1017 limit_kib,
1018 import_count,
1019 worker_generation,
1020 restart_reason,
1021 queue_wait_ms: queue_wait.map(duration_ms),
1022 duration_ms: duration.map(duration_ms),
1023 cold_open_attempts,
1024 cold_open_admitted,
1025 cold_open_refusals,
1026 import_like_requests,
1027 import_like_admitted,
1028 last_import_stats,
1029 }
1030}
1031
1032fn worker_resource_facts(
1033 cause: impl Into<String>,
1034 work_entered_child: bool,
1035 operation: Option<&'static str>,
1036 stats: &LeanWorkerStats,
1037 current_rss_kib: Option<u64>,
1038 limit_kib: Option<u64>,
1039 duration: Option<Duration>,
1040) -> LeanWorkerResourceExhaustedFacts {
1041 resource_facts(
1042 cause,
1043 work_entered_child,
1044 operation,
1045 current_rss_kib,
1046 limit_kib,
1047 Some(stats.imports),
1048 Some(stats.restarts),
1049 stats
1050 .last_restart_reason
1051 .as_ref()
1052 .map(LeanWorkerRestartReason::stable_cause)
1053 .map(str::to_owned),
1054 None,
1055 duration,
1056 None,
1057 None,
1058 None,
1059 Some(stats.import_like_admission_attempts),
1060 Some(stats.import_like_admitted),
1061 stats.last_import_stats.clone(),
1062 )
1063}
1064
1065#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
1066struct WorkerGeneration(u64);
1067
1068impl WorkerGeneration {
1069 fn next(self) -> Self {
1070 Self(self.0.saturating_add(1))
1071 }
1072}
1073
1074#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
1075struct WorkerRequestId(u64);
1076
1077impl WorkerRequestId {
1078 fn next(self) -> Self {
1079 Self(self.0.saturating_add(1))
1080 }
1081}
1082
1083#[derive(Clone, Debug, Eq, PartialEq)]
1084struct InFlightRequest {
1085 id: WorkerRequestId,
1086 operation: &'static str,
1087 generation: WorkerGeneration,
1088}
1089
1090#[derive(Clone, Debug, Eq, PartialEq)]
1091enum WorkerSupervisorState {
1092 Idle { generation: WorkerGeneration },
1093 Busy { request: InFlightRequest },
1094 Streaming { request: InFlightRequest },
1095 Stopping { generation: WorkerGeneration },
1096 Killing { generation: WorkerGeneration },
1097 Reaping { generation: WorkerGeneration },
1098 Crashed { generation: WorkerGeneration },
1099 RestartExhausted { generation: WorkerGeneration },
1100 Exited { generation: WorkerGeneration },
1101}
1102
1103impl WorkerSupervisorState {
1104 fn rejects_new_requests(&self) -> bool {
1105 matches!(
1106 self,
1107 Self::Stopping { .. } | Self::Killing { .. } | Self::Reaping { .. } | Self::RestartExhausted { .. }
1108 )
1109 }
1110
1111 fn current_operation(&self) -> Option<&'static str> {
1112 match self {
1113 Self::Busy { request } | Self::Streaming { request } => Some(request.operation),
1114 Self::Idle { .. }
1115 | Self::Stopping { .. }
1116 | Self::Killing { .. }
1117 | Self::Reaping { .. }
1118 | Self::Crashed { .. }
1119 | Self::RestartExhausted { .. }
1120 | Self::Exited { .. } => None,
1121 }
1122 }
1123
1124 fn current_request_id(&self) -> Option<WorkerRequestId> {
1125 match self {
1126 Self::Busy { request } | Self::Streaming { request } => Some(request.id),
1127 Self::Idle { .. }
1128 | Self::Stopping { .. }
1129 | Self::Killing { .. }
1130 | Self::Reaping { .. }
1131 | Self::Crashed { .. }
1132 | Self::RestartExhausted { .. }
1133 | Self::Exited { .. } => None,
1134 }
1135 }
1136}
1137
1138#[derive(Debug)]
1144pub struct LeanWorker {
1145 config: LeanWorkerConfig,
1146 child: Option<Child>,
1147 stdin: Option<BufWriter<ChildStdin>>,
1148 stdout: Option<BufReader<ChildStdout>>,
1149 stderr: Option<ChildStderr>,
1150 last_exit: Option<LeanWorkerExit>,
1151 runtime_metadata: LeanWorkerRuntimeMetadata,
1152 stats: LeanWorkerStats,
1153 requests_since_restart: u64,
1154 imports_since_restart: u64,
1155 last_activity: Instant,
1156 generation: WorkerGeneration,
1157 next_request_id: WorkerRequestId,
1158 restart_window: VecDeque<Instant>,
1159 state: WorkerSupervisorState,
1160}
1161
1162#[allow(
1163 clippy::wildcard_enum_match_arm,
1164 reason = "Response and Message are #[non_exhaustive] across the lean-rs-worker-protocol crate boundary; every wildcard arm here uniformly converts an unexpected variant into a protocol-level error rather than enumerating each known variant"
1165)]
1166impl LeanWorker {
1167 pub fn spawn(config: &LeanWorkerConfig) -> Result<Self, LeanWorkerError> {
1175 let spawn_started = Instant::now();
1176 let mut command = Command::new(&config.executable);
1177 command
1178 .stdin(Stdio::piped())
1179 .stdout(Stdio::piped())
1180 .stderr(Stdio::piped())
1181 .env("LEAN_ABORT_ON_PANIC", "1")
1182 .env("LEAN_BACKTRACE", "0")
1183 .env("RUST_BACKTRACE", "0");
1184
1185 if let Some(current_dir) = &config.current_dir {
1186 command.current_dir(current_dir);
1187 }
1188 for (key, value) in &config.env {
1189 command.env(key, value);
1190 }
1191
1192 let mut child = command.spawn().map_err(|source| LeanWorkerError::Spawn {
1193 executable: config.executable.clone(),
1194 source,
1195 })?;
1196
1197 let mut stdin = child
1198 .stdin
1199 .take()
1200 .map(BufWriter::new)
1201 .ok_or_else(|| LeanWorkerError::Setup {
1202 message: "child stdin unavailable".to_owned(),
1203 })?;
1204 let stdout = child.stdout.take().ok_or_else(|| LeanWorkerError::Setup {
1205 message: "child stdout unavailable".to_owned(),
1206 })?;
1207 let stderr = child.stderr.take();
1208
1209 let max_frame_bytes = config.max_frame_bytes;
1210 let (sender, receiver) = mpsc::channel();
1211 let _handshake_reader = thread::spawn(move || {
1212 let mut stdout = BufReader::new(stdout);
1213 let result = expect_handshake(&mut stdout, max_frame_bytes);
1214 drop(sender.send((stdout, result)));
1215 });
1216
1217 let (stdout, runtime_metadata) = match receiver.recv_timeout(config.startup_timeout) {
1218 Ok((stdout, Ok(metadata))) => (stdout, metadata),
1219 Ok((_stdout, Err(_handshake_err))) => {
1220 drop(child.kill());
1228 let exit = wait_with_stderr(&mut child, stderr)?;
1229 return Err(if exit.success {
1230 LeanWorkerError::ChildExited { exit }
1231 } else {
1232 LeanWorkerError::ChildPanicOrAbort { exit }
1233 });
1234 }
1235 Err(mpsc::RecvTimeoutError::Timeout) => {
1236 drop(child.kill());
1237 let _exit = wait_with_stderr(&mut child, stderr)?;
1238 return Err(LeanWorkerError::Timeout {
1239 operation: "startup",
1240 duration: config.startup_timeout,
1241 resource: Box::new(resource_facts(
1242 "worker_timeout",
1243 false,
1244 Some("startup"),
1245 None,
1246 None,
1247 None,
1248 None,
1249 None,
1250 None,
1251 Some(config.startup_timeout),
1252 None,
1253 None,
1254 None,
1255 None,
1256 None,
1257 None,
1258 )),
1259 });
1260 }
1261 Err(mpsc::RecvTimeoutError::Disconnected) => {
1262 return Err(LeanWorkerError::Handshake {
1263 message: "handshake reader exited without a result".to_owned(),
1264 });
1265 }
1266 };
1267
1268 write_frame(
1272 &mut stdin,
1273 Message::ConfigureFrameLimit { max_frame_bytes },
1274 max_frame_bytes,
1275 )
1276 .map_err(|err| LeanWorkerError::Protocol {
1277 message: format!("failed to send ConfigureFrameLimit: {err}"),
1278 })?;
1279
1280 let spawn_handshake_elapsed = spawn_started.elapsed();
1281 Ok(Self {
1282 config: config.clone(),
1283 child: Some(child),
1284 stdin: Some(stdin),
1285 stdout: Some(stdout),
1286 stderr,
1287 last_exit: None,
1288 runtime_metadata,
1289 stats: LeanWorkerStats {
1290 last_spawn_handshake_elapsed: Some(spawn_handshake_elapsed),
1291 ..LeanWorkerStats::default()
1292 },
1293 requests_since_restart: 0,
1294 imports_since_restart: 0,
1295 last_activity: Instant::now(),
1296 generation: WorkerGeneration::default(),
1297 next_request_id: WorkerRequestId::default(),
1298 restart_window: VecDeque::new(),
1299 state: WorkerSupervisorState::Idle {
1300 generation: WorkerGeneration::default(),
1301 },
1302 })
1303 }
1304
1305 pub fn health(&mut self) -> Result<(), LeanWorkerError> {
1312 self.prepare_request(false)?;
1313 self.send_request(Request::Health)?;
1314 self.record_request(false);
1315 match self.read_response("health")? {
1316 Response::HealthOk => Ok(()),
1317 other => Err(unexpected_response("health", &other)),
1318 }
1319 }
1320
1321 pub fn load_fixture_capability(&mut self, fixture_root: impl AsRef<Path>) -> Result<(), LeanWorkerError> {
1332 let manifest_path = fixture_capability_manifest(fixture_root.as_ref())?;
1333 self.prepare_request(true)?;
1334 self.send_request(Request::LoadFixtureCapability {
1335 manifest_path: path_string(&manifest_path),
1336 })?;
1337 self.record_request(true);
1338 match self.read_response("load_fixture_capability")? {
1339 Response::CapabilityLoaded => Ok(()),
1340 other => Err(unexpected_response("load_fixture_capability", &other)),
1341 }
1342 }
1343
1344 pub fn call_fixture_mul(
1351 &mut self,
1352 fixture_root: impl AsRef<Path>,
1353 lhs: u64,
1354 rhs: u64,
1355 ) -> Result<u64, LeanWorkerError> {
1356 let manifest_path = fixture_capability_manifest(fixture_root.as_ref())?;
1357 self.prepare_request(true)?;
1358 self.send_request(Request::CallFixtureMul {
1359 manifest_path: path_string(&manifest_path),
1360 lhs,
1361 rhs,
1362 })?;
1363 self.record_request(true);
1364 match self.read_response("call_fixture_mul")? {
1365 Response::U64 { value } => Ok(value),
1366 other => Err(unexpected_response("call_fixture_mul", &other)),
1367 }
1368 }
1369
1370 pub fn status(&mut self) -> Result<LeanWorkerStatus, LeanWorkerError> {
1376 if let Some(exit) = &self.last_exit {
1377 self.state = WorkerSupervisorState::Exited {
1378 generation: self.generation,
1379 };
1380 return Ok(LeanWorkerStatus::Exited(exit.clone()));
1381 }
1382 let Some(child) = self.child.as_mut() else {
1383 self.state = WorkerSupervisorState::Exited {
1384 generation: self.generation,
1385 };
1386 return Ok(LeanWorkerStatus::Exited(LeanWorkerExit {
1387 success: false,
1388 code: None,
1389 status: "worker is not running".to_owned(),
1390 diagnostics: String::new(),
1391 }));
1392 };
1393 match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
1394 Some(status) => {
1395 let diagnostics = self.read_stderr();
1396 let exit = LeanWorkerExit::from_status(status, diagnostics);
1397 self.last_exit = Some(exit.clone());
1398 self.child = None;
1399 self.stdin = None;
1400 self.stdout = None;
1401 self.stats.exits = self.stats.exits.saturating_add(1);
1402 self.state = WorkerSupervisorState::Exited {
1403 generation: self.generation,
1404 };
1405 Ok(LeanWorkerStatus::Exited(exit))
1406 }
1407 None => Ok(LeanWorkerStatus::Running),
1408 }
1409 }
1410
1411 #[must_use]
1413 pub fn stats(&self) -> LeanWorkerStats {
1414 self.stats.clone()
1415 }
1416
1417 pub(crate) fn record_capability_open_timing(
1418 &mut self,
1419 capability_load_elapsed: Duration,
1420 session_open_import_elapsed: Duration,
1421 ) {
1422 self.stats.last_capability_load_elapsed = Some(capability_load_elapsed);
1423 self.stats.last_session_open_import_elapsed = Some(session_open_import_elapsed);
1424 if let Some(timing) = self.stats.last_replacement_timing.as_mut() {
1425 timing.capability_load = capability_load_elapsed;
1426 timing.session_open_import = session_open_import_elapsed;
1427 }
1428 }
1429
1430 pub(crate) fn record_command_timing(&mut self, first_command_after_open: bool, elapsed: Duration) {
1431 if first_command_after_open {
1432 self.stats.last_first_command_elapsed = Some(elapsed);
1433 if let Some(timing) = self.stats.last_replacement_timing.as_mut() {
1434 timing.first_command = Some(elapsed);
1435 }
1436 } else {
1437 self.stats.last_warm_command_elapsed = Some(elapsed);
1438 if let Some(timing) = self.stats.last_replacement_timing.as_mut() {
1439 timing.warm_command = Some(elapsed);
1440 }
1441 }
1442 }
1443
1444 #[must_use]
1446 pub fn lifecycle_snapshot(&self) -> LeanWorkerLifecycleSnapshot {
1447 LeanWorkerLifecycleSnapshot::from_worker(&self.stats, self.last_exit.clone())
1448 }
1449
1450 #[must_use]
1452 pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
1453 self.runtime_metadata.clone()
1454 }
1455
1456 pub fn rss_kib(&mut self) -> Option<u64> {
1462 match self.child_rss_kib() {
1463 Some(value) => {
1464 self.stats.last_rss_kib = Some(value);
1465 Some(value)
1466 }
1467 None => {
1468 self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
1469 None
1470 }
1471 }
1472 }
1473
1474 #[must_use]
1476 pub fn request_timeout(&self) -> Duration {
1477 self.config.request_timeout
1478 }
1479
1480 pub fn set_request_timeout(&mut self, timeout: Duration) {
1485 self.config.request_timeout = timeout;
1486 }
1487
1488 pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
1499 self.restart_with_reason(LeanWorkerRestartReason::Explicit)
1500 }
1501
1502 pub(crate) fn cycle_with_restart_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
1503 self.restart_with_reason(reason)
1504 }
1505
1506 pub fn restart(&mut self) -> Result<(), LeanWorkerError> {
1517 self.cycle()
1518 }
1519
1520 #[doc(hidden)]
1521 pub fn __kill_for_test(&mut self) -> Result<(), LeanWorkerError> {
1528 let Some(child) = self.child.as_mut() else {
1529 return Err(self.dead_error());
1530 };
1531 child.kill().map_err(|source| LeanWorkerError::Kill { source })?;
1532 Ok(())
1533 }
1534
1535 #[doc(hidden)]
1536 #[must_use]
1538 pub fn __child_pid_for_test(&self) -> Option<u32> {
1539 self.child.as_ref().map(Child::id)
1540 }
1541
1542 #[deprecated(note = "use LeanWorker::shutdown for structured shutdown status")]
1549 pub fn terminate(self) -> Result<LeanWorkerExit, LeanWorkerError> {
1550 self.shutdown().map(|report| report.exit)
1551 }
1552
1553 pub fn shutdown(mut self) -> Result<LeanWorkerShutdownReport, LeanWorkerError> {
1561 self.shutdown_child(ShutdownIntent::Graceful)
1562 }
1563
1564 #[doc(hidden)]
1565 pub fn __trigger_lean_panic_fixture(
1572 mut self,
1573 fixture_root: impl AsRef<Path>,
1574 ) -> Result<LeanWorkerExit, LeanWorkerError> {
1575 let manifest_path = fixture_capability_manifest(fixture_root.as_ref())?;
1576 self.prepare_request(true)?;
1577 self.send_request(Request::TriggerLeanPanic {
1578 manifest_path: path_string(&manifest_path),
1579 })?;
1580 self.record_request(true);
1581 match self.read_response("trigger_lean_panic") {
1582 Ok(response) => Err(unexpected_response("trigger_lean_panic", &response)),
1583 Err(LeanWorkerError::ChildPanicOrAbort { exit }) => Ok(exit),
1584 Err(err) => Err(err),
1585 }
1586 }
1587
1588 #[doc(hidden)]
1589 pub fn __emit_test_rows(
1596 &mut self,
1597 streams: Vec<String>,
1598 cancellation: Option<&LeanWorkerCancellationToken>,
1599 data: Option<&dyn LeanWorkerDataSink>,
1600 ) -> Result<u64, LeanWorkerError> {
1601 const OPERATION: &str = "emit_test_rows";
1602 check_cancelled(OPERATION, cancellation)?;
1603 self.prepare_request(false)?;
1604 self.send_request(Request::EmitTestRows { streams })?;
1605 self.record_request(false);
1606 match self.read_response_with_events(
1607 OPERATION,
1608 None,
1609 cancellation,
1610 data.map(LeanWorkerDataSinkTarget::Value),
1611 None,
1612 )? {
1613 Response::RowsComplete { count } => Ok(count),
1614 other => Err(unexpected_response(OPERATION, &other)),
1615 }
1616 }
1617
1618 #[doc(hidden)]
1619 pub fn __emit_test_rows_then_exit(
1625 &mut self,
1626 cancellation: Option<&LeanWorkerCancellationToken>,
1627 data: Option<&dyn LeanWorkerDataSink>,
1628 ) -> Result<(), LeanWorkerError> {
1629 const OPERATION: &str = "emit_test_rows_then_exit";
1630 check_cancelled(OPERATION, cancellation)?;
1631 self.prepare_request(false)?;
1632 self.send_request(Request::EmitTestRowsThenExit)?;
1633 self.record_request(false);
1634 let response = self.read_response_with_events(
1635 OPERATION,
1636 None,
1637 cancellation,
1638 data.map(LeanWorkerDataSinkTarget::Value),
1639 None,
1640 )?;
1641 Err(unexpected_response(OPERATION, &response))
1642 }
1643
1644 #[doc(hidden)]
1645 pub fn __emit_test_rows_then_panic(
1651 &mut self,
1652 cancellation: Option<&LeanWorkerCancellationToken>,
1653 data: Option<&dyn LeanWorkerDataSink>,
1654 ) -> Result<(), LeanWorkerError> {
1655 const OPERATION: &str = "emit_test_rows_then_panic";
1656 check_cancelled(OPERATION, cancellation)?;
1657 self.prepare_request(false)?;
1658 self.send_request(Request::EmitTestRowsThenPanic)?;
1659 self.record_request(false);
1660 let response = self.read_response_with_events(
1661 OPERATION,
1662 None,
1663 cancellation,
1664 data.map(LeanWorkerDataSinkTarget::Value),
1665 None,
1666 )?;
1667 Err(unexpected_response(OPERATION, &response))
1668 }
1669
1670 pub(crate) fn open_worker_session(
1671 &mut self,
1672 config: &LeanWorkerSessionConfig,
1673 cancellation: Option<&LeanWorkerCancellationToken>,
1674 progress: Option<&dyn LeanWorkerProgressSink>,
1675 ) -> Result<(), LeanWorkerError> {
1676 const OPERATION: &str = "open_worker_session";
1677 check_cancelled(OPERATION, cancellation)?;
1678 let before_restarts = self.stats.restarts;
1679 self.prepare_request(true)?;
1680 let import_started = Instant::now();
1681 let mode = match config.mode() {
1682 LeanWorkerSessionMode::Capability {
1683 package,
1684 lib_name,
1685 manifest_path,
1686 } => HostSessionMode::Capability {
1687 package: package.clone(),
1688 lib_name: lib_name.clone(),
1689 manifest_path: manifest_path.as_ref().map(|path| path_string(path)),
1690 },
1691 LeanWorkerSessionMode::ShimsOnly => HostSessionMode::ShimsOnly,
1692 };
1693 self.send_request(Request::OpenHostSession {
1694 project_root: config.project_root_string(),
1695 mode,
1696 imports: config.imports().to_vec(),
1697 import_profile: config.import_profile(),
1698 })?;
1699 self.record_request(true);
1700 match self.read_response_with_progress(OPERATION, progress, cancellation)? {
1701 Response::HostSessionOpened { import_stats } => {
1702 let session_open_import_elapsed = import_started.elapsed();
1703 self.stats.last_session_open_import_elapsed = Some(session_open_import_elapsed);
1704 if self.stats.restarts > before_restarts
1705 && let Some(timing) = self.stats.last_replacement_timing.as_mut()
1706 {
1707 timing.session_open_import = session_open_import_elapsed;
1708 }
1709 self.stats.last_import_stats = Some(import_stats);
1710 Ok(())
1711 }
1712 other => Err(unexpected_response(OPERATION, &other)),
1713 }
1714 }
1715
1716 #[expect(
1717 clippy::wildcard_enum_match_arm,
1718 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1719 )]
1720 pub(crate) fn worker_elaborate(
1721 &mut self,
1722 source: &str,
1723 options: &LeanWorkerElabOptions,
1724 cancellation: Option<&LeanWorkerCancellationToken>,
1725 progress: Option<&dyn LeanWorkerProgressSink>,
1726 ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
1727 self.round_trip(
1728 "worker_elaborate",
1729 Request::Elaborate {
1730 source: source.to_owned(),
1731 options: options.clone(),
1732 },
1733 false,
1734 cancellation,
1735 progress,
1736 |response, operation| match response {
1737 Response::Elaboration { outcome } => Ok(outcome),
1738 other => Err(unexpected_response(operation, &other)),
1739 },
1740 )
1741 }
1742
1743 #[expect(
1744 clippy::wildcard_enum_match_arm,
1745 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1746 )]
1747 pub(crate) fn worker_kernel_check(
1748 &mut self,
1749 source: &str,
1750 options: &LeanWorkerElabOptions,
1751 cancellation: Option<&LeanWorkerCancellationToken>,
1752 progress: Option<&dyn LeanWorkerProgressSink>,
1753 ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
1754 self.round_trip(
1755 "worker_kernel_check",
1756 Request::KernelCheck {
1757 source: source.to_owned(),
1758 options: options.clone(),
1759 progress: progress.is_some(),
1760 },
1761 false,
1762 cancellation,
1763 progress,
1764 |response, operation| match response {
1765 Response::KernelCheck { outcome } => Ok(outcome),
1766 other => Err(unexpected_response(operation, &other)),
1767 },
1768 )
1769 }
1770
1771 #[expect(
1772 clippy::wildcard_enum_match_arm,
1773 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1774 )]
1775 pub(crate) fn worker_declaration_kinds(
1776 &mut self,
1777 names: &[&str],
1778 cancellation: Option<&LeanWorkerCancellationToken>,
1779 progress: Option<&dyn LeanWorkerProgressSink>,
1780 ) -> Result<Vec<String>, LeanWorkerError> {
1781 self.round_trip(
1782 "worker_declaration_kinds",
1783 Request::DeclarationKinds {
1784 names: names.iter().map(|name| (*name).to_owned()).collect(),
1785 progress: progress.is_some(),
1786 },
1787 false,
1788 cancellation,
1789 progress,
1790 |response, operation| match response {
1791 Response::Strings { values } => Ok(values),
1792 other => Err(unexpected_response(operation, &other)),
1793 },
1794 )
1795 }
1796
1797 #[expect(
1798 clippy::wildcard_enum_match_arm,
1799 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1800 )]
1801 pub(crate) fn worker_declaration_names(
1802 &mut self,
1803 names: &[&str],
1804 cancellation: Option<&LeanWorkerCancellationToken>,
1805 progress: Option<&dyn LeanWorkerProgressSink>,
1806 ) -> Result<Vec<String>, LeanWorkerError> {
1807 self.round_trip(
1808 "worker_declaration_names",
1809 Request::DeclarationNames {
1810 names: names.iter().map(|name| (*name).to_owned()).collect(),
1811 progress: progress.is_some(),
1812 },
1813 false,
1814 cancellation,
1815 progress,
1816 |response, operation| match response {
1817 Response::Strings { values } => Ok(values),
1818 other => Err(unexpected_response(operation, &other)),
1819 },
1820 )
1821 }
1822
1823 #[expect(
1824 clippy::wildcard_enum_match_arm,
1825 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1826 )]
1827 pub(crate) fn worker_infer_type(
1828 &mut self,
1829 source: &str,
1830 options: &LeanWorkerElabOptions,
1831 cancellation: Option<&LeanWorkerCancellationToken>,
1832 progress: Option<&dyn LeanWorkerProgressSink>,
1833 ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
1834 self.round_trip(
1835 "worker_infer_type",
1836 Request::InferType {
1837 source: source.to_owned(),
1838 options: options.clone(),
1839 },
1840 false,
1841 cancellation,
1842 progress,
1843 |response, operation| match response {
1844 Response::MetaExpr { result } => Ok(result),
1845 other => Err(unexpected_response(operation, &other)),
1846 },
1847 )
1848 }
1849
1850 #[expect(
1851 clippy::wildcard_enum_match_arm,
1852 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1853 )]
1854 pub(crate) fn worker_whnf(
1855 &mut self,
1856 source: &str,
1857 options: &LeanWorkerElabOptions,
1858 cancellation: Option<&LeanWorkerCancellationToken>,
1859 progress: Option<&dyn LeanWorkerProgressSink>,
1860 ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
1861 self.round_trip(
1862 "worker_whnf",
1863 Request::Whnf {
1864 source: source.to_owned(),
1865 options: options.clone(),
1866 },
1867 false,
1868 cancellation,
1869 progress,
1870 |response, operation| match response {
1871 Response::MetaExpr { result } => Ok(result),
1872 other => Err(unexpected_response(operation, &other)),
1873 },
1874 )
1875 }
1876
1877 #[expect(
1878 clippy::wildcard_enum_match_arm,
1879 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1880 )]
1881 pub(crate) fn worker_is_def_eq(
1882 &mut self,
1883 lhs: &str,
1884 rhs: &str,
1885 transparency: LeanWorkerMetaTransparency,
1886 options: &LeanWorkerElabOptions,
1887 cancellation: Option<&LeanWorkerCancellationToken>,
1888 progress: Option<&dyn LeanWorkerProgressSink>,
1889 ) -> Result<LeanWorkerMetaResult<bool>, LeanWorkerError> {
1890 self.round_trip(
1891 "worker_is_def_eq",
1892 Request::IsDefEq {
1893 lhs: lhs.to_owned(),
1894 rhs: rhs.to_owned(),
1895 transparency,
1896 options: options.clone(),
1897 },
1898 false,
1899 cancellation,
1900 progress,
1901 |response, operation| match response {
1902 Response::MetaBool { result } => Ok(result),
1903 other => Err(unexpected_response(operation, &other)),
1904 },
1905 )
1906 }
1907
1908 #[expect(
1909 clippy::wildcard_enum_match_arm,
1910 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1911 )]
1912 pub(crate) fn worker_describe(
1913 &mut self,
1914 name: &str,
1915 cancellation: Option<&LeanWorkerCancellationToken>,
1916 progress: Option<&dyn LeanWorkerProgressSink>,
1917 ) -> Result<Option<LeanWorkerDeclarationRow>, LeanWorkerError> {
1918 self.round_trip(
1919 "worker_describe",
1920 Request::Describe { name: name.to_owned() },
1921 false,
1922 cancellation,
1923 progress,
1924 |response, operation| match response {
1925 Response::Declaration { row } => Ok(row),
1926 other => Err(unexpected_response(operation, &other)),
1927 },
1928 )
1929 }
1930
1931 #[expect(
1932 clippy::wildcard_enum_match_arm,
1933 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1934 )]
1935 pub(crate) fn worker_search_declarations(
1936 &mut self,
1937 search: &LeanWorkerDeclarationSearch,
1938 cancellation: Option<&LeanWorkerCancellationToken>,
1939 progress: Option<&dyn LeanWorkerProgressSink>,
1940 ) -> Result<LeanWorkerDeclarationSearchResult, LeanWorkerError> {
1941 self.round_trip(
1942 "worker_search_declarations",
1943 Request::SearchDeclarations { search: search.clone() },
1944 false,
1945 cancellation,
1946 progress,
1947 |response, operation| match response {
1948 Response::DeclarationSearch { result } => Ok(result),
1949 other => Err(unexpected_response(operation, &other)),
1950 },
1951 )
1952 }
1953
1954 #[expect(
1955 clippy::wildcard_enum_match_arm,
1956 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1957 )]
1958 pub(crate) fn worker_declaration_type(
1959 &mut self,
1960 name: &str,
1961 max_bytes: usize,
1962 cancellation: Option<&LeanWorkerCancellationToken>,
1963 progress: Option<&dyn LeanWorkerProgressSink>,
1964 ) -> Result<Option<LeanWorkerDeclarationType>, LeanWorkerError> {
1965 self.round_trip(
1966 "worker_declaration_type",
1967 Request::DeclarationType {
1968 name: name.to_owned(),
1969 max_bytes,
1970 },
1971 false,
1972 cancellation,
1973 progress,
1974 |response, operation| match response {
1975 Response::DeclarationType { row } => Ok(row),
1976 other => Err(unexpected_response(operation, &other)),
1977 },
1978 )
1979 }
1980
1981 #[expect(
1982 clippy::wildcard_enum_match_arm,
1983 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
1984 )]
1985 pub(crate) fn worker_inspect_declaration(
1986 &mut self,
1987 request: &LeanWorkerDeclarationInspectionRequest,
1988 cancellation: Option<&LeanWorkerCancellationToken>,
1989 progress: Option<&dyn LeanWorkerProgressSink>,
1990 ) -> Result<LeanWorkerDeclarationInspectionResult, LeanWorkerError> {
1991 self.round_trip(
1992 "worker_inspect_declaration",
1993 Request::InspectDeclaration {
1994 request: request.clone(),
1995 },
1996 false,
1997 cancellation,
1998 progress,
1999 |response, operation| match response {
2000 Response::DeclarationInspection { result } => Ok(result),
2001 other => Err(unexpected_response(operation, &other)),
2002 },
2003 )
2004 }
2005
2006 #[expect(
2007 clippy::wildcard_enum_match_arm,
2008 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2009 )]
2010 pub(crate) fn worker_attempt_proof(
2011 &mut self,
2012 request: &LeanWorkerProofAttemptRequest,
2013 options: &LeanWorkerElabOptions,
2014 cancellation: Option<&LeanWorkerCancellationToken>,
2015 progress: Option<&dyn LeanWorkerProgressSink>,
2016 ) -> Result<LeanWorkerProofAttemptResult, LeanWorkerError> {
2017 self.round_trip(
2018 "worker_attempt_proof",
2019 Request::AttemptProof {
2020 request: request.clone(),
2021 options: options.clone(),
2022 progress: progress.is_some(),
2023 },
2024 false,
2025 cancellation,
2026 progress,
2027 |response, operation| match response {
2028 Response::ProofAttempt { result } => Ok(result),
2029 other => Err(unexpected_response(operation, &other)),
2030 },
2031 )
2032 }
2033
2034 #[expect(
2035 clippy::wildcard_enum_match_arm,
2036 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2037 )]
2038 pub(crate) fn worker_verify_declaration(
2039 &mut self,
2040 request: &LeanWorkerDeclarationVerificationRequest,
2041 options: &LeanWorkerElabOptions,
2042 cancellation: Option<&LeanWorkerCancellationToken>,
2043 progress: Option<&dyn LeanWorkerProgressSink>,
2044 ) -> Result<LeanWorkerDeclarationVerificationResult, LeanWorkerError> {
2045 const OPERATION: &str = "worker_verify_declaration";
2046 let outcome = self.round_trip(
2047 OPERATION,
2048 Request::VerifyDeclaration {
2049 request: request.clone(),
2050 options: options.clone(),
2051 progress: progress.is_some(),
2052 },
2053 false,
2054 cancellation,
2055 progress,
2056 |response, operation| match response {
2057 Response::DeclarationVerification { result } => Ok(result),
2058 other => Err(unexpected_response(operation, &other)),
2059 },
2060 );
2061 match outcome {
2062 Ok(result) => Ok(result),
2063 Err(err) => {
2069 self.recover_child_abort(OPERATION, err)?;
2070 Ok(LeanWorkerDeclarationVerificationResult::Ok {
2071 verification_status: LeanWorkerDeclarationVerificationStatus::BudgetExceeded,
2072 facts: Box::new(LeanWorkerDeclarationVerificationFacts::unavailable()),
2073 imports: Vec::new(),
2074 })
2075 }
2076 }
2077 }
2078
2079 #[expect(
2080 clippy::wildcard_enum_match_arm,
2081 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2082 )]
2083 pub(crate) fn worker_verify_declaration_batch(
2084 &mut self,
2085 request: &LeanWorkerDeclarationVerificationBatchRequest,
2086 options: &LeanWorkerElabOptions,
2087 cancellation: Option<&LeanWorkerCancellationToken>,
2088 progress: Option<&dyn LeanWorkerProgressSink>,
2089 ) -> Result<LeanWorkerDeclarationVerificationBatchResult, LeanWorkerError> {
2090 const OPERATION: &str = "worker_verify_declaration_batch";
2091 let outcome = self.round_trip(
2092 OPERATION,
2093 Request::VerifyDeclarationBatch {
2094 request: request.clone(),
2095 options: options.clone(),
2096 progress: progress.is_some(),
2097 },
2098 false,
2099 cancellation,
2100 progress,
2101 |response, operation| match response {
2102 Response::DeclarationVerificationBatch { result } => Ok(result),
2103 other => Err(unexpected_response(operation, &other)),
2104 },
2105 );
2106 match outcome {
2107 Ok(result) => Ok(result),
2108 Err(err) => {
2109 self.recover_child_abort(OPERATION, err)?;
2110 Ok(degraded_declaration_verification_batch_result(request))
2111 }
2112 }
2113 }
2114
2115 #[allow(
2116 clippy::needless_pass_by_value,
2117 reason = "filter is cheap to clone, passed by value matches caller shape"
2118 )]
2119 pub(crate) fn worker_list_declarations_strings(
2120 &mut self,
2121 filter: LeanWorkerDeclarationFilter,
2122 cancellation: Option<&LeanWorkerCancellationToken>,
2123 progress: Option<&dyn LeanWorkerProgressSink>,
2124 ) -> Result<Vec<String>, LeanWorkerError> {
2125 const OPERATION: &str = "worker_list_declarations_strings";
2126 check_cancelled(OPERATION, cancellation)?;
2127 self.prepare_request(false)?;
2128 self.send_request(Request::ListDeclarationsStrings {
2129 filter,
2130 progress: progress.is_some(),
2131 })?;
2132 self.record_request(false);
2133 let collector = DeclarationNameCollector::default();
2134 let response = self.read_response_with_events(
2135 OPERATION,
2136 progress,
2137 cancellation,
2138 Some(LeanWorkerDataSinkTarget::Raw(&collector)),
2139 None,
2140 )?;
2141 if let Some(message) = collector.decode_error.lock().ok().and_then(|guard| guard.clone()) {
2142 return Err(LeanWorkerError::Protocol { message });
2143 }
2144 match response {
2145 Response::RowsComplete { count } => {
2146 let names = collector.into_inner();
2147 let observed = u64::try_from(names.len()).unwrap_or(u64::MAX);
2148 if observed != count {
2149 return Err(LeanWorkerError::Protocol {
2150 message: format!(
2151 "worker_list_declarations_strings: parent collected {observed} rows but child reported {count}"
2152 ),
2153 });
2154 }
2155 Ok(names)
2156 }
2157 other => Err(unexpected_response(OPERATION, &other)),
2158 }
2159 }
2160
2161 #[expect(
2162 clippy::wildcard_enum_match_arm,
2163 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2164 )]
2165 pub(crate) fn worker_describe_bulk(
2166 &mut self,
2167 names: &[&str],
2168 cancellation: Option<&LeanWorkerCancellationToken>,
2169 progress: Option<&dyn LeanWorkerProgressSink>,
2170 ) -> Result<Vec<LeanWorkerDeclarationRow>, LeanWorkerError> {
2171 self.round_trip(
2172 "worker_describe_bulk",
2173 Request::DescribeBulk {
2174 names: names.iter().map(|name| (*name).to_owned()).collect(),
2175 progress: progress.is_some(),
2176 },
2177 false,
2178 cancellation,
2179 progress,
2180 |response, operation| match response {
2181 Response::DeclarationBulk { rows } => Ok(rows),
2182 other => Err(unexpected_response(operation, &other)),
2183 },
2184 )
2185 }
2186
2187 #[expect(
2188 clippy::wildcard_enum_match_arm,
2189 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2190 )]
2191 pub(crate) fn worker_process_module_query(
2192 &mut self,
2193 source: &str,
2194 query: LeanWorkerModuleQuery,
2195 options: &LeanWorkerElabOptions,
2196 cancellation: Option<&LeanWorkerCancellationToken>,
2197 progress: Option<&dyn LeanWorkerProgressSink>,
2198 ) -> Result<LeanWorkerModuleQueryOutcome, LeanWorkerError> {
2199 self.round_trip(
2200 "worker_process_module_query",
2201 Request::ProcessModuleQuery {
2202 source: source.to_owned(),
2203 query,
2204 options: options.clone(),
2205 },
2206 false,
2207 cancellation,
2208 progress,
2209 |response, operation| match response {
2210 Response::ProcessModuleQuery { outcome } => Ok(outcome),
2211 other => Err(unexpected_response(operation, &other)),
2212 },
2213 )
2214 }
2215
2216 #[expect(
2217 clippy::wildcard_enum_match_arm,
2218 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2219 )]
2220 pub(crate) fn worker_process_module_query_batch(
2221 &mut self,
2222 source: &str,
2223 selectors: &[LeanWorkerModuleQuerySelector],
2224 budgets: &LeanWorkerOutputBudgets,
2225 options: &LeanWorkerElabOptions,
2226 cancellation: Option<&LeanWorkerCancellationToken>,
2227 progress: Option<&dyn LeanWorkerProgressSink>,
2228 ) -> Result<LeanWorkerModuleQueryBatchOutcome, LeanWorkerError> {
2229 const OPERATION: &str = "worker_process_module_query_batch";
2230 let outcome = self.round_trip(
2231 OPERATION,
2232 Request::ProcessModuleQueryBatch {
2233 source: source.to_owned(),
2234 selectors: selectors.to_vec(),
2235 budgets: budgets.clone(),
2236 options: options.clone(),
2237 },
2238 false,
2239 cancellation,
2240 progress,
2241 |response, operation| match response {
2242 Response::ProcessModuleQueryBatch { outcome } => Ok(outcome),
2243 other => Err(unexpected_response(operation, &other)),
2244 },
2245 );
2246 match outcome {
2247 Ok(outcome) => Ok(outcome),
2248 Err(err) => {
2251 let resource = err.resource_exhausted_facts().cloned().unwrap_or_else(|| {
2252 worker_resource_facts(
2253 "worker_child_abort",
2254 true,
2255 Some(OPERATION),
2256 &self.stats,
2257 self.stats.last_rss_kib,
2258 None,
2259 None,
2260 )
2261 });
2262 self.recover_child_abort(OPERATION, err)?;
2263 Ok(degraded_query_batch_outcome(selectors, resource))
2264 }
2265 }
2266 }
2267
2268 #[expect(
2269 clippy::wildcard_enum_match_arm,
2270 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2271 )]
2272 pub(crate) fn worker_clear_module_snapshot_cache(
2273 &mut self,
2274 cancellation: Option<&LeanWorkerCancellationToken>,
2275 progress: Option<&dyn LeanWorkerProgressSink>,
2276 ) -> Result<LeanWorkerModuleSnapshotCacheClearResult, LeanWorkerError> {
2277 self.round_trip(
2278 "worker_clear_module_snapshot_cache",
2279 Request::ClearModuleSnapshotCache,
2280 false,
2281 cancellation,
2282 progress,
2283 |response, operation| match response {
2284 Response::ModuleSnapshotCacheCleared { result } => Ok(result),
2285 other => Err(unexpected_response(operation, &other)),
2286 },
2287 )
2288 }
2289
2290 pub(crate) fn worker_run_data_stream(
2291 &mut self,
2292 export: &str,
2293 request: &serde_json::Value,
2294 rows: &dyn LeanWorkerDataSink,
2295 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
2296 cancellation: Option<&LeanWorkerCancellationToken>,
2297 progress: Option<&dyn LeanWorkerProgressSink>,
2298 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
2299 self.worker_run_data_stream_with_sink(
2300 export,
2301 request,
2302 LeanWorkerDataSinkTarget::Value(rows),
2303 diagnostics,
2304 cancellation,
2305 progress,
2306 )
2307 }
2308
2309 pub(crate) fn worker_run_data_stream_raw(
2310 &mut self,
2311 export: &str,
2312 request: &serde_json::Value,
2313 rows: &dyn LeanWorkerRawDataSink,
2314 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
2315 cancellation: Option<&LeanWorkerCancellationToken>,
2316 progress: Option<&dyn LeanWorkerProgressSink>,
2317 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
2318 self.worker_run_data_stream_with_sink(
2319 export,
2320 request,
2321 LeanWorkerDataSinkTarget::Raw(rows),
2322 diagnostics,
2323 cancellation,
2324 progress,
2325 )
2326 }
2327
2328 fn worker_run_data_stream_with_sink(
2329 &mut self,
2330 export: &str,
2331 request: &serde_json::Value,
2332 rows: LeanWorkerDataSinkTarget<'_>,
2333 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
2334 cancellation: Option<&LeanWorkerCancellationToken>,
2335 progress: Option<&dyn LeanWorkerProgressSink>,
2336 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
2337 const OPERATION: &str = "worker_run_data_stream";
2338 check_cancelled(OPERATION, cancellation)?;
2339 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
2340 message: format!("worker data-stream request JSON encode failed: {err}"),
2341 })?;
2342 self.prepare_request(false)?;
2343 self.send_request(Request::RunDataStream {
2344 export: export.to_owned(),
2345 request_json,
2346 progress: progress.is_some(),
2347 })?;
2348 self.record_request(false);
2349 self.stats.stream_requests = self.stats.stream_requests.saturating_add(1);
2350 match self.read_response_with_events(OPERATION, progress, cancellation, Some(rows), diagnostics)? {
2351 Response::StreamComplete { summary } => Ok(summary.into()),
2352 Response::StreamExportFailed { status_byte } => {
2353 Err(LeanWorkerError::StreamExportFailed { status: status_byte })
2354 }
2355 Response::StreamCallbackFailed {
2356 status_byte,
2357 description,
2358 } => Err(LeanWorkerError::StreamCallbackFailed {
2359 status: status_byte,
2360 description,
2361 }),
2362 Response::StreamRowMalformed { message } => Err(LeanWorkerError::StreamRowMalformed { message }),
2363 other => Err(unexpected_response(OPERATION, &other)),
2364 }
2365 }
2366
2367 pub(crate) fn worker_capability_metadata(
2368 &mut self,
2369 export: &str,
2370 request: &serde_json::Value,
2371 cancellation: Option<&LeanWorkerCancellationToken>,
2372 progress: Option<&dyn LeanWorkerProgressSink>,
2373 ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
2374 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
2375 message: format!("worker capability metadata request JSON encode failed: {err}"),
2376 })?;
2377 self.round_trip(
2378 "worker_capability_metadata",
2379 Request::CapabilityMetadata {
2380 export: export.to_owned(),
2381 request_json,
2382 },
2383 false,
2384 cancellation,
2385 progress,
2386 |response, operation| match response {
2387 Response::CapabilityMetadata { metadata } => Ok(metadata),
2388 Response::CapabilityMetadataMalformed { message } => {
2389 Err(LeanWorkerError::CapabilityMetadataMalformed { message })
2390 }
2391 other => Err(unexpected_response(operation, &other)),
2392 },
2393 )
2394 }
2395
2396 #[expect(
2397 clippy::wildcard_enum_match_arm,
2398 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2399 )]
2400 pub(crate) fn worker_capability_doctor(
2401 &mut self,
2402 export: &str,
2403 request: &serde_json::Value,
2404 cancellation: Option<&LeanWorkerCancellationToken>,
2405 progress: Option<&dyn LeanWorkerProgressSink>,
2406 ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
2407 let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
2408 message: format!("worker capability doctor request JSON encode failed: {err}"),
2409 })?;
2410 self.round_trip(
2411 "worker_capability_doctor",
2412 Request::CapabilityDoctor {
2413 export: export.to_owned(),
2414 request_json,
2415 },
2416 false,
2417 cancellation,
2418 progress,
2419 |response, operation| match response {
2420 Response::CapabilityDoctor { report } => Ok(report),
2421 Response::CapabilityDoctorMalformed { message } => {
2422 Err(LeanWorkerError::CapabilityDoctorMalformed { message })
2423 }
2424 other => Err(unexpected_response(operation, &other)),
2425 },
2426 )
2427 }
2428
2429 #[expect(
2430 clippy::wildcard_enum_match_arm,
2431 reason = "round_trip deliberately collapses per-method Response wildcards into a uniform unexpected_response branch; a new variant surfaces at runtime, not compile time"
2432 )]
2433 pub(crate) fn worker_json_command(
2434 &mut self,
2435 export: &str,
2436 request_json: String,
2437 cancellation: Option<&LeanWorkerCancellationToken>,
2438 progress: Option<&dyn LeanWorkerProgressSink>,
2439 ) -> Result<String, LeanWorkerError> {
2440 self.round_trip(
2441 "worker_json_command",
2442 Request::JsonCommand {
2443 export: export.to_owned(),
2444 request_json,
2445 },
2446 false,
2447 cancellation,
2448 progress,
2449 |response, operation| match response {
2450 Response::JsonCommand { response_json } => Ok(response_json),
2451 other => Err(unexpected_response(operation, &other)),
2452 },
2453 )
2454 }
2455
2456 fn send_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
2457 self.ensure_running()?;
2458 self.write_request(request)
2459 }
2460
2461 fn write_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
2462 let max_frame_bytes = self.config.max_frame_bytes;
2463 let Some(stdin) = self.stdin.as_mut() else {
2464 return Err(self.dead_error());
2465 };
2466 write_frame(stdin, Message::Request(request), max_frame_bytes).map_err(|err| LeanWorkerError::Protocol {
2467 message: err.to_string(),
2468 })
2469 }
2470
2471 fn begin_in_flight(&mut self, operation: &'static str) -> InFlightRequest {
2472 let request = InFlightRequest {
2473 id: self.next_request_id,
2474 operation,
2475 generation: self.generation,
2476 };
2477 self.next_request_id = self.next_request_id.next();
2478 self.state = WorkerSupervisorState::Busy {
2479 request: request.clone(),
2480 };
2481 request
2482 }
2483
2484 fn mark_current_request_streaming(&mut self) {
2485 match &self.state {
2486 WorkerSupervisorState::Busy { request } | WorkerSupervisorState::Streaming { request } => {
2487 self.state = WorkerSupervisorState::Streaming {
2488 request: request.clone(),
2489 };
2490 }
2491 _ => {}
2492 }
2493 }
2494
2495 fn finish_in_flight(&mut self) {
2496 if matches!(
2497 self.state,
2498 WorkerSupervisorState::Busy { .. } | WorkerSupervisorState::Streaming { .. }
2499 ) {
2500 self.state = WorkerSupervisorState::Idle {
2501 generation: self.generation,
2502 };
2503 }
2504 }
2505
2506 fn prepare_request(&mut self, import_like: bool) -> Result<(), LeanWorkerError> {
2507 self.ensure_running()?;
2508 if import_like {
2509 self.stats.import_like_admission_attempts = self.stats.import_like_admission_attempts.saturating_add(1);
2510 self.stats.last_import_like_rss_before_admission_kib = self.child_rss_kib();
2511 }
2512
2513 if let Some(limit) = self.config.restart_policy.max_requests
2514 && self.requests_since_restart >= limit
2515 {
2516 self.restart_with_reason(LeanWorkerRestartReason::MaxRequests { limit })?;
2517 if import_like {
2518 self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2519 }
2520 return Ok(());
2521 }
2522
2523 if import_like
2524 && let Some(limit) = self.config.restart_policy.max_imports
2525 && self.imports_since_restart >= limit
2526 {
2527 self.restart_with_reason(LeanWorkerRestartReason::MaxImports { limit })?;
2528 self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2529 return Ok(());
2530 }
2531
2532 if let Some(limit_kib) = self.config.restart_policy.max_rss_kib {
2533 match self.child_rss_kib() {
2534 Some(current_kib) if current_kib >= limit_kib => {
2535 self.stats.last_rss_kib = Some(current_kib);
2536 self.restart_with_reason(LeanWorkerRestartReason::RssCeiling {
2537 current_kib,
2538 limit_kib,
2539 last_import_stats: self.stats.last_import_stats.clone(),
2540 })?;
2541 if import_like {
2542 self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2543 }
2544 return Ok(());
2545 }
2546 Some(current_kib) => {
2547 self.stats.last_rss_kib = Some(current_kib);
2548 }
2549 None => {
2550 self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
2551 }
2552 }
2553 }
2554
2555 if let Some(limit) = self.config.restart_policy.idle_restart_after {
2556 let idle_for = self.last_activity.elapsed();
2557 if idle_for >= limit {
2558 self.restart_with_reason(LeanWorkerRestartReason::Idle { idle_for, limit })?;
2559 if import_like {
2560 self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2561 }
2562 return Ok(());
2563 }
2564 }
2565
2566 if import_like {
2567 self.stats.import_like_admitted = self.stats.import_like_admitted.saturating_add(1);
2568 }
2569 Ok(())
2570 }
2571
2572 fn record_request(&mut self, import_like: bool) {
2573 self.stats.requests = self.stats.requests.saturating_add(1);
2574 self.requests_since_restart = self.requests_since_restart.saturating_add(1);
2575 if import_like {
2576 self.stats.imports = self.stats.imports.saturating_add(1);
2577 self.imports_since_restart = self.imports_since_restart.saturating_add(1);
2578 }
2579 self.last_activity = Instant::now();
2580 }
2581
2582 fn restart_with_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
2583 self.restart_with_reason_before_spawn(reason, || {})
2584 }
2585
2586 fn restart_with_reason_before_spawn(
2587 &mut self,
2588 reason: LeanWorkerRestartReason,
2589 before_spawn: impl FnOnce(),
2590 ) -> Result<(), LeanWorkerError> {
2591 let config = self.config.clone();
2592 let replacement_started = Instant::now();
2593 self.stats.replacement_attempts = self.stats.replacement_attempts.saturating_add(1);
2594 let stop_intent = if matches!(
2595 &reason,
2596 LeanWorkerRestartReason::Explicit
2597 | LeanWorkerRestartReason::MaxRequests { .. }
2598 | LeanWorkerRestartReason::MaxImports { .. }
2599 | LeanWorkerRestartReason::RssCeiling { .. }
2600 | LeanWorkerRestartReason::Idle { .. }
2601 ) {
2602 ShutdownIntent::Graceful
2603 } else {
2604 ShutdownIntent::KillOnly
2605 };
2606 if let Err(err) = self.shutdown_child(stop_intent) {
2607 self.stats.replacement_failures = self.stats.replacement_failures.saturating_add(1);
2608 self.stats.last_replacement_skipped_reason = Some("stop_failed".to_owned());
2609 return Err(err);
2610 }
2611 before_spawn();
2612 if let Err(err) = self.admit_restart(replacement_started) {
2613 self.stats.replacement_budget_skipped = self.stats.replacement_budget_skipped.saturating_add(1);
2614 self.stats.replacement_failures = self.stats.replacement_failures.saturating_add(1);
2615 self.stats.last_replacement_skipped_reason = Some("restart_limit_exceeded".to_owned());
2616 return Err(err);
2617 }
2618 self.stats.replacement_budget_admitted = self.stats.replacement_budget_admitted.saturating_add(1);
2619 let next_generation = self.generation.next();
2620 self.stats.record_restart(reason);
2621 self.requests_since_restart = 0;
2622 self.imports_since_restart = 0;
2623 let reason = self
2624 .stats
2625 .last_restart_reason
2626 .as_ref()
2627 .map_or_else(|| "unknown".to_owned(), |reason| reason.stable_cause().to_owned());
2628 let mut next = match Self::spawn(&config) {
2629 Ok(next) => next,
2630 Err(err) => {
2631 self.stats.replacement_failures = self.stats.replacement_failures.saturating_add(1);
2632 self.stats.last_replacement_skipped_reason = Some("spawn_failed".to_owned());
2633 return Err(err);
2634 }
2635 };
2636 let next_request_id = self.next_request_id;
2637 let spawn_handshake = next.stats.last_spawn_handshake_elapsed.unwrap_or_default();
2638 let mut stats = self.stats.clone();
2639 stats.replacement_successes = stats.replacement_successes.saturating_add(1);
2640 stats.last_replacement_skipped_reason = None;
2641 stats.last_spawn_handshake_elapsed = Some(spawn_handshake);
2642 stats.last_replacement_timing = Some(LeanWorkerReplacementTiming {
2643 spawn_handshake,
2644 capability_load: stats.last_capability_load_elapsed.unwrap_or_default(),
2645 session_open_import: Duration::ZERO,
2646 first_command: stats.last_first_command_elapsed,
2647 warm_command: stats.last_warm_command_elapsed,
2648 replacement_total: replacement_started.elapsed(),
2649 replacement_reason: reason,
2650 replacement_budget_status: "synchronous-no-overlap".to_owned(),
2651 });
2652 next.stats = stats;
2653 next.generation = next_generation;
2654 next.next_request_id = next_request_id;
2655 next.state = WorkerSupervisorState::Idle {
2656 generation: next_generation,
2657 };
2658 next.restart_window.clone_from(&self.restart_window);
2659 next.last_activity = Instant::now();
2660 *self = next;
2661 Ok(())
2662 }
2663
2664 fn admit_restart(&mut self, now: Instant) -> Result<(), LeanWorkerError> {
2665 let limit = self.config.restart_policy.restart_intensity;
2666 while self
2667 .restart_window
2668 .front()
2669 .is_some_and(|instant| now.saturating_duration_since(*instant) >= limit.window)
2670 {
2671 let _ = self.restart_window.pop_front();
2672 }
2673 let restarts = u64::try_from(self.restart_window.len()).unwrap_or(u64::MAX);
2674 if restarts >= limit.max_restarts {
2675 self.state = WorkerSupervisorState::RestartExhausted {
2676 generation: self.generation,
2677 };
2678 return Err(LeanWorkerError::RestartLimitExceeded {
2679 restarts,
2680 window: limit.window,
2681 });
2682 }
2683 self.restart_window.push_back(now);
2684 Ok(())
2685 }
2686
2687 fn recover_child_abort(&mut self, operation: &'static str, err: LeanWorkerError) -> Result<(), LeanWorkerError> {
2692 if matches!(err, LeanWorkerError::ChildPanicOrAbort { .. }) {
2693 self.restart_with_reason(LeanWorkerRestartReason::ChildAbort { operation })
2694 } else {
2695 Err(err)
2696 }
2697 }
2698
2699 fn hard_rss_limit_exceeded(&mut self) -> Option<(u64, u64)> {
2700 let limit_kib = self.config.rss_hard_limit_kib?;
2701 match self.child_rss_kib() {
2702 Some(current_kib) if current_kib >= limit_kib => {
2703 self.stats.last_rss_kib = Some(current_kib);
2704 Some((current_kib, limit_kib))
2705 }
2706 Some(current_kib) => {
2707 self.stats.last_rss_kib = Some(current_kib);
2708 None
2709 }
2710 None => {
2711 self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
2712 None
2713 }
2714 }
2715 }
2716
2717 fn read_response(&mut self, operation: &'static str) -> Result<Response, LeanWorkerError> {
2718 self.read_response_with_events(operation, None, None, None, None)
2719 }
2720
2721 fn read_response_with_progress(
2722 &mut self,
2723 operation: &'static str,
2724 progress: Option<&dyn LeanWorkerProgressSink>,
2725 cancellation: Option<&LeanWorkerCancellationToken>,
2726 ) -> Result<Response, LeanWorkerError> {
2727 self.read_response_with_events(operation, progress, cancellation, None, None)
2728 }
2729
2730 fn round_trip<R>(
2741 &mut self,
2742 operation: &'static str,
2743 request: Request,
2744 import_like: bool,
2745 cancellation: Option<&LeanWorkerCancellationToken>,
2746 progress: Option<&dyn LeanWorkerProgressSink>,
2747 extract: impl FnOnce(Response, &'static str) -> Result<R, LeanWorkerError>,
2748 ) -> Result<R, LeanWorkerError> {
2749 check_cancelled(operation, cancellation)?;
2750 self.prepare_request(import_like)?;
2751 self.send_request(request)?;
2752 self.record_request(import_like);
2753 let response = self.read_response_with_progress(operation, progress, cancellation)?;
2754 extract(response, operation)
2755 }
2756
2757 fn read_response_with_events(
2758 &mut self,
2759 operation: &'static str,
2760 progress: Option<&dyn LeanWorkerProgressSink>,
2761 cancellation: Option<&LeanWorkerCancellationToken>,
2762 data: Option<LeanWorkerDataSinkTarget<'_>>,
2763 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
2764 ) -> Result<Response, LeanWorkerError> {
2765 let started = Instant::now();
2766 let timeout = self.config.request_timeout;
2767 let deadline = started.checked_add(timeout);
2768 let streaming = data.is_some();
2769 let mut request_backpressure_waits = 0_u64;
2770 let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
2771 let max_frame_bytes = self.config.max_frame_bytes;
2772 let request = self.begin_in_flight(operation);
2773 let generation = request.generation;
2774 let request_id = Some(request.id);
2775 let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
2776 let reader =
2777 thread::spawn(move || read_request_messages(stdout, sender, max_frame_bytes, generation, request_id));
2778
2779 loop {
2780 if let Some((current_kib, limit_kib)) = self.hard_rss_limit_exceeded() {
2781 if streaming {
2782 self.record_stream_failure(started, request_backpressure_waits);
2783 }
2784 let last_import_stats = self.stats.last_import_stats.clone();
2785 if let Err(err) = self.restart_with_reason_before_spawn(
2786 LeanWorkerRestartReason::RssHardLimit {
2787 operation,
2788 current_kib,
2789 limit_kib,
2790 last_import_stats: last_import_stats.clone(),
2791 },
2792 || {
2793 drop(receiver);
2794 drop(reader.join());
2795 },
2796 ) {
2797 self.finish_in_flight();
2798 return Err(err);
2799 }
2800 self.finish_in_flight();
2801 return Err(LeanWorkerError::RssHardLimitExceeded {
2802 operation,
2803 current_kib,
2804 limit_kib,
2805 last_import_stats: last_import_stats.map(Box::new),
2806 resource: Box::new(worker_resource_facts(
2807 "worker_rss_hard_limit",
2808 true,
2809 Some(operation),
2810 &self.stats,
2811 Some(current_kib),
2812 Some(limit_kib),
2813 None,
2814 )),
2815 });
2816 }
2817 let event = match deadline.and_then(|deadline| deadline.checked_duration_since(Instant::now())) {
2818 Some(remaining) if remaining.is_zero() => {
2819 if streaming {
2820 self.record_stream_failure(started, request_backpressure_waits);
2821 }
2822 if let Err(err) = self.restart_with_reason_before_spawn(
2823 LeanWorkerRestartReason::RequestTimeout {
2824 operation,
2825 duration: timeout,
2826 },
2827 || {
2828 drop(receiver);
2829 drop(reader.join());
2830 },
2831 ) {
2832 self.finish_in_flight();
2833 return Err(err);
2834 }
2835 self.finish_in_flight();
2836 return Err(LeanWorkerError::Timeout {
2837 operation,
2838 duration: timeout,
2839 resource: Box::new(worker_resource_facts(
2840 "worker_timeout",
2841 true,
2842 Some(operation),
2843 &self.stats,
2844 self.stats.last_rss_kib,
2845 None,
2846 Some(timeout),
2847 )),
2848 });
2849 }
2850 Some(remaining) => {
2851 let hard_watch_enabled = self.config.rss_hard_limit_kib.is_some();
2852 let wait_for = if hard_watch_enabled {
2853 remaining.min(self.config.rss_sample_interval)
2854 } else {
2855 remaining
2856 };
2857 match receiver.recv_timeout(wait_for) {
2858 Ok(event) => event,
2859 Err(mpsc::RecvTimeoutError::Timeout) if hard_watch_enabled && wait_for < remaining => {
2860 continue;
2861 }
2862 Err(mpsc::RecvTimeoutError::Timeout) => {
2863 if streaming {
2864 self.record_stream_failure(started, request_backpressure_waits);
2865 }
2866 if let Err(err) = self.restart_with_reason_before_spawn(
2867 LeanWorkerRestartReason::RequestTimeout {
2868 operation,
2869 duration: timeout,
2870 },
2871 || {
2872 drop(receiver);
2873 drop(reader.join());
2874 },
2875 ) {
2876 self.finish_in_flight();
2877 return Err(err);
2878 }
2879 self.finish_in_flight();
2880 return Err(LeanWorkerError::Timeout {
2881 operation,
2882 duration: timeout,
2883 resource: Box::new(worker_resource_facts(
2884 "worker_timeout",
2885 true,
2886 Some(operation),
2887 &self.stats,
2888 self.stats.last_rss_kib,
2889 None,
2890 Some(timeout),
2891 )),
2892 });
2893 }
2894 Err(mpsc::RecvTimeoutError::Disconnected) => {
2895 self.finish_in_flight();
2896 drop(reader.join());
2897 return Err(LeanWorkerError::Protocol {
2898 message: "worker response reader exited without a terminal response".to_owned(),
2899 });
2900 }
2901 }
2902 }
2903 None => match receiver.recv() {
2904 Ok(event) => event,
2905 Err(_err) => {
2906 self.finish_in_flight();
2907 drop(reader.join());
2908 return Err(LeanWorkerError::Protocol {
2909 message: "worker response reader exited without a terminal response".to_owned(),
2910 });
2911 }
2912 },
2913 };
2914 if event.generation() != self.generation {
2915 let actual_generation = event.generation();
2916 self.finish_in_flight();
2917 drop(reader.join());
2918 return Err(stale_worker_output_error(operation, self.generation, actual_generation));
2919 }
2920 if let Some(actual_request_id) = event.request_id()
2921 && self.state.current_request_id() != Some(actual_request_id)
2922 {
2923 self.finish_in_flight();
2924 drop(reader.join());
2925 return Err(stale_worker_request_output_error(
2926 operation,
2927 self.state.current_request_id(),
2928 actual_request_id,
2929 ));
2930 }
2931 request_backpressure_waits = request_backpressure_waits.saturating_add(event.backpressure_waits());
2932 self.stats.backpressure_waits = self.stats.backpressure_waits.saturating_add(event.backpressure_waits());
2933
2934 let message = match event {
2935 RequestReaderEvent::Message { message, .. } => message,
2936 RequestReaderEvent::Terminal { message, stdout, .. } => {
2937 self.stdout = Some(stdout);
2938 match message {
2939 Message::Response(Response::Error { code, message }) => {
2940 self.terminalize_request_failure(streaming, started, request_backpressure_waits);
2941 drop(reader.join());
2942 return Err(LeanWorkerError::Worker { code, message });
2943 }
2944 Message::Response(response) => {
2945 let response = self.terminalize_request_response(
2946 response,
2947 streaming,
2948 started,
2949 request_backpressure_waits,
2950 );
2951 drop(reader.join());
2952 return Ok(response);
2953 }
2954 other => {
2955 self.terminalize_request_failure(streaming, started, request_backpressure_waits);
2956 drop(reader.join());
2957 return Err(LeanWorkerError::Protocol {
2958 message: format!("worker sent unexpected {operation} message: {other:?}"),
2959 });
2960 }
2961 }
2962 }
2963 RequestReaderEvent::ReadError { message, eof, .. } => {
2964 drop(reader.join());
2965 self.terminalize_request_failure(streaming, started, request_backpressure_waits);
2966 return if eof {
2967 Err(self.record_exit_error())
2968 } else {
2969 Err(LeanWorkerError::Protocol { message })
2970 };
2971 }
2972 };
2973
2974 match message {
2975 Message::ProgressTick(tick) => {
2976 self.mark_current_request_streaming();
2977 if let Err(err) =
2978 report_parent_progress(progress, elapsed_event(tick.phase, tick.current, tick.total, started))
2979 {
2980 self.terminalize_request_failure(streaming, started, request_backpressure_waits);
2981 return Err(err);
2982 }
2983 if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
2984 if streaming {
2985 self.record_stream_failure(started, request_backpressure_waits);
2986 }
2987 if let Err(err) = self.restart_with_reason_before_spawn(
2988 LeanWorkerRestartReason::Cancelled { operation },
2989 || {
2990 drop(receiver);
2991 drop(reader.join());
2992 },
2993 ) {
2994 self.finish_in_flight();
2995 return Err(err);
2996 }
2997 self.finish_in_flight();
2998 return Err(LeanWorkerError::Cancelled {
2999 operation,
3000 resource: Box::new(worker_resource_facts(
3001 "worker_cancelled",
3002 true,
3003 Some(operation),
3004 &self.stats,
3005 self.stats.last_rss_kib,
3006 None,
3007 None,
3008 )),
3009 });
3010 }
3011 }
3012 Message::DataRow(row) => {
3013 self.mark_current_request_streaming();
3014 let payload_bytes = row.payload.get().len() as u64;
3015 if let Err(err) = report_parent_data_row(data, row) {
3016 self.terminalize_request_failure(streaming, started, request_backpressure_waits);
3017 return Err(err);
3018 }
3019 self.stats.data_rows_delivered = self.stats.data_rows_delivered.saturating_add(1);
3020 self.stats.data_row_payload_bytes = self.stats.data_row_payload_bytes.saturating_add(payload_bytes);
3021 if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
3022 if streaming {
3023 self.record_stream_failure(started, request_backpressure_waits);
3024 }
3025 if let Err(err) = self.restart_with_reason_before_spawn(
3026 LeanWorkerRestartReason::Cancelled { operation },
3027 || {
3028 drop(receiver);
3029 drop(reader.join());
3030 },
3031 ) {
3032 self.finish_in_flight();
3033 return Err(err);
3034 }
3035 self.finish_in_flight();
3036 return Err(LeanWorkerError::Cancelled {
3037 operation,
3038 resource: Box::new(worker_resource_facts(
3039 "worker_cancelled",
3040 true,
3041 Some(operation),
3042 &self.stats,
3043 self.stats.last_rss_kib,
3044 None,
3045 None,
3046 )),
3047 });
3048 }
3049 }
3050 Message::Diagnostic(diagnostic) => {
3051 self.mark_current_request_streaming();
3052 if let Err(err) = report_parent_diagnostic(diagnostics, diagnostic.into()) {
3053 self.terminalize_request_failure(streaming, started, request_backpressure_waits);
3054 return Err(err);
3055 }
3056 }
3057 Message::Response(response) => {
3058 self.terminalize_request_failure(streaming, started, request_backpressure_waits);
3059 return Err(unexpected_response(operation, &response));
3060 }
3061 other => {
3062 self.terminalize_request_failure(streaming, started, request_backpressure_waits);
3063 return Err(LeanWorkerError::Protocol {
3064 message: format!("worker sent unexpected {operation} message: {other:?}"),
3065 });
3066 }
3067 }
3068 }
3069 }
3070
3071 fn ensure_running(&mut self) -> Result<(), LeanWorkerError> {
3072 if self.state.rejects_new_requests() {
3073 return Err(LeanWorkerError::ShutdownInProgress {
3074 operation: self.state.current_operation().unwrap_or("worker_request"),
3075 });
3076 }
3077 match self.status()? {
3078 LeanWorkerStatus::Running => Ok(()),
3079 LeanWorkerStatus::Exited(exit) if exit.success => Err(LeanWorkerError::ChildExited { exit }),
3080 LeanWorkerStatus::Exited(exit) => Err(LeanWorkerError::ChildPanicOrAbort { exit }),
3081 }
3082 }
3083
3084 fn terminalize_request_response(
3085 &mut self,
3086 response: Response,
3087 streaming: bool,
3088 started: Instant,
3089 backpressure_waits: u64,
3090 ) -> Response {
3091 if streaming {
3092 if matches!(
3093 response,
3094 Response::StreamComplete { .. } | Response::RowsComplete { .. }
3095 ) {
3096 self.record_stream_success(started);
3097 } else {
3098 self.record_stream_failure(started, backpressure_waits);
3099 }
3100 }
3101 self.finish_in_flight();
3102 response
3103 }
3104
3105 fn terminalize_request_failure(&mut self, streaming: bool, started: Instant, backpressure_waits: u64) {
3106 if streaming {
3107 self.record_stream_failure(started, backpressure_waits);
3108 }
3109 self.finish_in_flight();
3110 }
3111
3112 fn record_stream_success(&mut self, started: Instant) {
3113 self.stats.stream_successes = self.stats.stream_successes.saturating_add(1);
3114 self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
3115 }
3116
3117 fn record_stream_failure(&mut self, started: Instant, backpressure_waits: u64) {
3118 self.stats.stream_failures = self.stats.stream_failures.saturating_add(1);
3119 self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
3120 if backpressure_waits > 0 {
3121 self.stats.backpressure_failures = self.stats.backpressure_failures.saturating_add(1);
3122 }
3123 }
3124
3125 fn wait_for_exit(&mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
3126 let Some(child) = self.child.as_mut() else {
3127 return Err(self.dead_error());
3128 };
3129 self.state = WorkerSupervisorState::Reaping {
3130 generation: self.generation,
3131 };
3132 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
3133 Ok(self.finalize_child_exit(status))
3134 }
3135
3136 fn wait_for_exit_bounded(
3137 &mut self,
3138 operation: &'static str,
3139 timeout: Duration,
3140 ) -> Result<(LeanWorkerExit, Duration), LeanWorkerError> {
3141 let started = Instant::now();
3142 loop {
3143 let Some(child) = self.child.as_mut() else {
3144 return Err(self.dead_error());
3145 };
3146 self.state = WorkerSupervisorState::Reaping {
3147 generation: self.generation,
3148 };
3149 match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
3150 Some(status) => return Ok((self.finalize_child_exit(status), started.elapsed())),
3151 None if started.elapsed() >= timeout => {
3152 return Err(LeanWorkerError::WaitTimeout {
3153 operation,
3154 duration: timeout,
3155 });
3156 }
3157 None => thread::sleep(Duration::from_millis(10).min(timeout.saturating_sub(started.elapsed()))),
3158 }
3159 }
3160 }
3161
3162 fn finalize_child_exit(&mut self, status: ExitStatus) -> LeanWorkerExit {
3163 let diagnostics = self.read_stderr();
3164 let exit = LeanWorkerExit::from_status(status, diagnostics);
3165 self.last_exit = Some(exit.clone());
3166 self.child = None;
3167 self.stdin = None;
3168 self.stdout = None;
3169 self.finish_in_flight();
3170 self.state = WorkerSupervisorState::Exited {
3171 generation: self.generation,
3172 };
3173 self.stats.exits = self.stats.exits.saturating_add(1);
3174 exit
3175 }
3176
3177 fn record_exit_error(&mut self) -> LeanWorkerError {
3178 self.state = WorkerSupervisorState::Crashed {
3179 generation: self.generation,
3180 };
3181 match self.wait_for_exit() {
3182 Ok(exit) if exit.success => LeanWorkerError::ChildExited { exit },
3183 Ok(exit) => LeanWorkerError::ChildPanicOrAbort { exit },
3184 Err(err) => err,
3185 }
3186 }
3187
3188 fn shutdown_child(&mut self, intent: ShutdownIntent) -> Result<LeanWorkerShutdownReport, LeanWorkerError> {
3189 let started = Instant::now();
3190 let graceful_timeout = self.config.shutdown_timeout;
3191 match self.status()? {
3192 LeanWorkerStatus::Exited(exit) => {
3193 return Ok(LeanWorkerShutdownReport {
3194 outcome: LeanWorkerShutdownOutcome::AlreadyExited,
3195 exit,
3196 graceful_timeout,
3197 elapsed: started.elapsed(),
3198 kill_elapsed: None,
3199 wait_elapsed: Duration::ZERO,
3200 });
3201 }
3202 LeanWorkerStatus::Running => {}
3203 }
3204
3205 self.state = if intent == ShutdownIntent::Graceful {
3206 WorkerSupervisorState::Stopping {
3207 generation: self.generation,
3208 }
3209 } else {
3210 WorkerSupervisorState::Killing {
3211 generation: self.generation,
3212 }
3213 };
3214 self.finish_in_flight();
3215
3216 if intent == ShutdownIntent::Graceful {
3217 match self.write_request(Request::Terminate) {
3218 Ok(()) => return self.wait_for_graceful_shutdown(started, graceful_timeout),
3219 Err(LeanWorkerError::Protocol { .. } | LeanWorkerError::Worker { .. }) => {
3220 return self.kill_and_report(
3221 started,
3222 graceful_timeout,
3223 LeanWorkerShutdownOutcome::GracefulProtocolFailedKilled,
3224 );
3225 }
3226 Err(err) => return Err(err),
3227 }
3228 }
3229
3230 self.kill_and_report(started, graceful_timeout, LeanWorkerShutdownOutcome::KillOnly)
3231 }
3232
3233 fn wait_for_graceful_shutdown(
3234 &mut self,
3235 started: Instant,
3236 graceful_timeout: Duration,
3237 ) -> Result<LeanWorkerShutdownReport, LeanWorkerError> {
3238 let grace_started = Instant::now();
3239 let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
3240 let max_frame_bytes = self.config.max_frame_bytes;
3241 let generation = self.generation;
3242 let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
3243 let reader = thread::spawn(move || read_request_messages(stdout, sender, max_frame_bytes, generation, None));
3244
3245 loop {
3246 if let Some(child) = self.child.as_mut()
3247 && let Some(status) = child.try_wait().map_err(|source| LeanWorkerError::Wait { source })?
3248 {
3249 drop(receiver);
3250 drop(reader.join());
3251 let wait_elapsed = grace_started.elapsed();
3252 let exit = self.finalize_child_exit(status);
3253 return Ok(LeanWorkerShutdownReport {
3254 outcome: LeanWorkerShutdownOutcome::Graceful,
3255 exit,
3256 graceful_timeout,
3257 elapsed: started.elapsed(),
3258 kill_elapsed: None,
3259 wait_elapsed,
3260 });
3261 }
3262
3263 let elapsed = grace_started.elapsed();
3264 if elapsed >= graceful_timeout {
3265 let kill_started = Instant::now();
3266 if let Some(child) = self.child.as_mut() {
3267 child.kill().map_err(|source| LeanWorkerError::Kill { source })?;
3268 }
3269 drop(receiver);
3270 drop(reader.join());
3271 let (exit, wait_elapsed) =
3272 self.wait_for_exit_bounded("kill_wait", LEAN_WORKER_KILL_WAIT_TIMEOUT_DEFAULT)?;
3273 return Ok(LeanWorkerShutdownReport {
3274 outcome: LeanWorkerShutdownOutcome::GracefulTimedOutKilled,
3275 exit,
3276 graceful_timeout,
3277 elapsed: started.elapsed(),
3278 kill_elapsed: Some(kill_started.elapsed()),
3279 wait_elapsed,
3280 });
3281 }
3282
3283 let receive_timeout = Duration::from_millis(10).min(graceful_timeout.saturating_sub(elapsed));
3284 match receiver.recv_timeout(receive_timeout) {
3285 Ok(RequestReaderEvent::Terminal {
3286 message: Message::Response(Response::Terminating),
3287 stdout,
3288 ..
3289 }) => {
3290 drop(reader.join());
3291 self.stdout = Some(stdout);
3292 let remaining = graceful_timeout.saturating_sub(grace_started.elapsed());
3293 match self.wait_for_exit_bounded("shutdown", remaining) {
3294 Ok((exit, wait_elapsed)) => {
3295 return Ok(LeanWorkerShutdownReport {
3296 outcome: LeanWorkerShutdownOutcome::Graceful,
3297 exit,
3298 graceful_timeout,
3299 elapsed: started.elapsed(),
3300 kill_elapsed: None,
3301 wait_elapsed,
3302 });
3303 }
3304 Err(LeanWorkerError::WaitTimeout { .. }) => {
3305 return self.kill_and_report(
3306 started,
3307 graceful_timeout,
3308 LeanWorkerShutdownOutcome::GracefulTimedOutKilled,
3309 );
3310 }
3311 Err(err) => return Err(err),
3312 }
3313 }
3314 Ok(RequestReaderEvent::Terminal { stdout, .. }) => {
3315 drop(reader.join());
3316 self.stdout = Some(stdout);
3317 return self.kill_and_report(
3318 started,
3319 graceful_timeout,
3320 LeanWorkerShutdownOutcome::GracefulProtocolFailedKilled,
3321 );
3322 }
3323 Ok(RequestReaderEvent::ReadError { eof: true, .. }) => {
3324 drop(reader.join());
3325 let remaining = graceful_timeout.saturating_sub(grace_started.elapsed());
3326 match self.wait_for_exit_bounded("shutdown", remaining) {
3327 Ok((exit, wait_elapsed)) => {
3328 return Ok(LeanWorkerShutdownReport {
3329 outcome: LeanWorkerShutdownOutcome::Graceful,
3330 exit,
3331 graceful_timeout,
3332 elapsed: started.elapsed(),
3333 kill_elapsed: None,
3334 wait_elapsed,
3335 });
3336 }
3337 Err(LeanWorkerError::WaitTimeout { .. }) => {
3338 return self.kill_and_report(
3339 started,
3340 graceful_timeout,
3341 LeanWorkerShutdownOutcome::GracefulTimedOutKilled,
3342 );
3343 }
3344 Err(err) => return Err(err),
3345 }
3346 }
3347 Ok(RequestReaderEvent::ReadError { .. }) | Err(mpsc::RecvTimeoutError::Disconnected) => {
3348 drop(reader.join());
3349 return self.kill_and_report(
3350 started,
3351 graceful_timeout,
3352 LeanWorkerShutdownOutcome::GracefulProtocolFailedKilled,
3353 );
3354 }
3355 Ok(RequestReaderEvent::Message { .. }) | Err(mpsc::RecvTimeoutError::Timeout) => {}
3356 }
3357 }
3358 }
3359
3360 fn kill_and_report(
3361 &mut self,
3362 started: Instant,
3363 graceful_timeout: Duration,
3364 outcome: LeanWorkerShutdownOutcome,
3365 ) -> Result<LeanWorkerShutdownReport, LeanWorkerError> {
3366 let kill_started = Instant::now();
3367 self.state = WorkerSupervisorState::Killing {
3368 generation: self.generation,
3369 };
3370 if let Some(child) = self.child.as_mut() {
3371 child.kill().map_err(|source| LeanWorkerError::Kill { source })?;
3372 }
3373 let (exit, wait_elapsed) = self.wait_for_exit_bounded("kill_wait", LEAN_WORKER_KILL_WAIT_TIMEOUT_DEFAULT)?;
3374 Ok(LeanWorkerShutdownReport {
3375 outcome,
3376 exit,
3377 graceful_timeout,
3378 elapsed: started.elapsed(),
3379 kill_elapsed: Some(kill_started.elapsed()),
3380 wait_elapsed,
3381 })
3382 }
3383
3384 fn dead_error(&self) -> LeanWorkerError {
3385 let exit = self.last_exit.clone().unwrap_or_else(|| LeanWorkerExit {
3386 success: false,
3387 code: None,
3388 status: "worker is not running".to_owned(),
3389 diagnostics: String::new(),
3390 });
3391 if exit.success {
3392 LeanWorkerError::ChildExited { exit }
3393 } else {
3394 LeanWorkerError::ChildPanicOrAbort { exit }
3395 }
3396 }
3397
3398 fn read_stderr(&mut self) -> String {
3399 let mut diagnostics = String::new();
3400 if let Some(mut pipe) = self.stderr.take() {
3401 drop(pipe.read_to_string(&mut diagnostics));
3402 }
3403 diagnostics
3404 }
3405
3406 fn child_rss_kib(&mut self) -> Option<u64> {
3407 let child = self.child.as_mut()?;
3408 child_rss_kib(child.id())
3409 }
3410}
3411
3412enum RequestReaderEvent {
3413 Message {
3414 generation: WorkerGeneration,
3415 request_id: Option<WorkerRequestId>,
3416 message: Message,
3417 backpressure_waits: u64,
3418 },
3419 Terminal {
3420 generation: WorkerGeneration,
3421 request_id: Option<WorkerRequestId>,
3422 message: Message,
3423 stdout: BufReader<ChildStdout>,
3424 backpressure_waits: u64,
3425 },
3426 ReadError {
3427 generation: WorkerGeneration,
3428 request_id: Option<WorkerRequestId>,
3429 message: String,
3430 eof: bool,
3431 backpressure_waits: u64,
3432 },
3433}
3434
3435#[derive(Clone, Copy, Debug, Eq, PartialEq)]
3436enum ShutdownIntent {
3437 Graceful,
3438 KillOnly,
3439}
3440
3441impl RequestReaderEvent {
3442 fn generation(&self) -> WorkerGeneration {
3443 match self {
3444 Self::Message { generation, .. }
3445 | Self::Terminal { generation, .. }
3446 | Self::ReadError { generation, .. } => *generation,
3447 }
3448 }
3449
3450 fn backpressure_waits(&self) -> u64 {
3451 match self {
3452 Self::Message { backpressure_waits, .. }
3453 | Self::Terminal { backpressure_waits, .. }
3454 | Self::ReadError { backpressure_waits, .. } => *backpressure_waits,
3455 }
3456 }
3457
3458 fn request_id(&self) -> Option<WorkerRequestId> {
3459 match self {
3460 Self::Message { request_id, .. }
3461 | Self::Terminal { request_id, .. }
3462 | Self::ReadError { request_id, .. } => *request_id,
3463 }
3464 }
3465
3466 fn add_backpressure_wait(&mut self) {
3467 match self {
3468 Self::Message { backpressure_waits, .. }
3469 | Self::Terminal { backpressure_waits, .. }
3470 | Self::ReadError { backpressure_waits, .. } => {
3471 *backpressure_waits = backpressure_waits.saturating_add(1);
3472 }
3473 }
3474 }
3475}
3476
3477#[allow(
3478 clippy::needless_pass_by_value,
3479 reason = "the request reader thread must own the sender"
3480)]
3481fn read_request_messages(
3482 mut stdout: BufReader<ChildStdout>,
3483 sender: mpsc::SyncSender<RequestReaderEvent>,
3484 max_frame_bytes: u32,
3485 generation: WorkerGeneration,
3486 request_id: Option<WorkerRequestId>,
3487) {
3488 loop {
3489 match read_frame(&mut stdout, max_frame_bytes) {
3490 Ok(frame) if matches!(frame.message, Message::Response(_)) => {
3491 let _ = send_reader_event(
3492 &sender,
3493 RequestReaderEvent::Terminal {
3494 generation,
3495 request_id,
3496 message: frame.message,
3497 stdout,
3498 backpressure_waits: 0,
3499 },
3500 );
3501 return;
3502 }
3503 Ok(frame) => {
3504 if send_reader_event(
3505 &sender,
3506 RequestReaderEvent::Message {
3507 generation,
3508 request_id,
3509 message: frame.message,
3510 backpressure_waits: 0,
3511 },
3512 )
3513 .is_err()
3514 {
3515 return;
3516 }
3517 }
3518 Err(err) => {
3519 let _ = send_reader_event(
3520 &sender,
3521 RequestReaderEvent::ReadError {
3522 generation,
3523 request_id,
3524 message: err.to_string(),
3525 eof: err.is_eof(),
3526 backpressure_waits: 0,
3527 },
3528 );
3529 return;
3530 }
3531 }
3532 }
3533}
3534
3535fn send_reader_event(sender: &mpsc::SyncSender<RequestReaderEvent>, event: RequestReaderEvent) -> Result<(), ()> {
3536 match sender.try_send(event) {
3537 Ok(()) => Ok(()),
3538 Err(mpsc::TrySendError::Full(mut event)) => {
3539 event.add_backpressure_wait();
3540 sender.send(event).map_err(|_| ())
3541 }
3542 Err(mpsc::TrySendError::Disconnected(_event)) => Err(()),
3543 }
3544}
3545
3546impl Drop for LeanWorker {
3547 fn drop(&mut self) {
3548 drop(self.shutdown_child(ShutdownIntent::Graceful));
3549 }
3550}
3551
3552#[allow(
3553 clippy::wildcard_enum_match_arm,
3554 reason = "Message is #[non_exhaustive] across the lean-rs-worker-protocol crate boundary; the wildcard arm uniformly rejects any non-handshake frame"
3555)]
3556fn expect_handshake(
3557 stdout: &mut BufReader<ChildStdout>,
3558 max_frame_bytes: u32,
3559) -> Result<LeanWorkerRuntimeMetadata, LeanWorkerError> {
3560 let frame = read_frame(stdout, max_frame_bytes).map_err(|err| {
3561 if err.is_eof() {
3562 LeanWorkerError::Handshake {
3563 message: "child closed stdout before handshake".to_owned(),
3564 }
3565 } else {
3566 LeanWorkerError::Handshake {
3567 message: err.to_string(),
3568 }
3569 }
3570 })?;
3571 match frame.message {
3572 Message::Handshake {
3573 worker_version,
3574 protocol_version,
3575 } if protocol_version == lean_rs_worker_protocol::protocol::PROTOCOL_VERSION => Ok(LeanWorkerRuntimeMetadata {
3576 worker_version,
3577 protocol_version,
3578 lean_version: None,
3579 }),
3580 other => Err(LeanWorkerError::Handshake {
3581 message: format!("unexpected handshake frame: {other:?}"),
3582 }),
3583 }
3584}
3585
3586fn wait_with_stderr(child: &mut Child, stderr: Option<ChildStderr>) -> Result<LeanWorkerExit, LeanWorkerError> {
3587 let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
3588 let mut diagnostics = String::new();
3589 if let Some(mut pipe) = stderr {
3590 drop(pipe.read_to_string(&mut diagnostics));
3591 }
3592 Ok(LeanWorkerExit::from_status(status, diagnostics))
3593}
3594
3595const DISPLAY_DIAGNOSTICS_MAX_BYTES: usize = 4 * 1024;
3600
3601fn write_exit(f: &mut fmt::Formatter<'_>, prefix: &str, exit: &LeanWorkerExit) -> fmt::Result {
3602 let tail = exit.diagnostics.trim();
3603 if tail.is_empty() {
3604 write!(f, "{prefix} with {}", exit.status)
3605 } else {
3606 let truncated = truncate_for_display(tail, DISPLAY_DIAGNOSTICS_MAX_BYTES);
3607 write!(f, "{prefix} with {}: {truncated}", exit.status)
3608 }
3609}
3610
3611fn import_stats_diagnostic(stats: Option<&LeanWorkerImportStats>) -> String {
3612 let Some(stats) = stats else {
3613 return String::from("last_import_stats=unavailable");
3614 };
3615 format!(
3616 "last_import_stats=available import_profile=level:{} import_all:{} load_exts:{} direct_import_count={} direct_imports={} effective_modules={} compacted_regions={} memory_mapped_regions={} compacted_region_bytes={} memory_mapped_region_bytes={} non_memory_mapped_region_bytes={} imported_constants={} extension_entries={}",
3617 stats.import_level,
3618 stats.import_all,
3619 stats.load_exts,
3620 stats.direct_import_names.len(),
3621 stats.direct_import_names.join(","),
3622 stats.effective_module_count,
3623 stats.compacted_region_count,
3624 stats.memory_mapped_region_count,
3625 stats.compacted_region_bytes,
3626 stats.memory_mapped_region_bytes,
3627 stats.non_memory_mapped_region_bytes,
3628 stats.imported_constant_count,
3629 stats.total_imported_extension_entries
3630 )
3631}
3632
3633fn truncate_for_display(text: &str, max_bytes: usize) -> String {
3644 if text.len() <= max_bytes {
3645 return text.to_owned();
3646 }
3647
3648 let mut cut = max_bytes;
3649 while cut > 0 && !text.is_char_boundary(cut) {
3650 cut = cut.saturating_sub(1);
3651 }
3652
3653 let bytes = text.as_bytes();
3654 if let Some(esc_off) = bytes
3655 .get(..cut)
3656 .and_then(|prefix| prefix.iter().rposition(|&b| b == 0x1b))
3657 {
3658 let scan_start = esc_off.saturating_add(2).min(cut);
3662 let terminated = bytes
3663 .get(scan_start..cut)
3664 .is_some_and(|tail| tail.iter().any(|&b| matches!(b, 0x40..=0x5a | 0x5c..=0x7e)));
3665 if !terminated {
3666 cut = esc_off;
3667 }
3668 }
3669
3670 while cut > 0 && !text.is_char_boundary(cut) {
3671 cut = cut.saturating_sub(1);
3672 }
3673
3674 let truncated_bytes = text.len().saturating_sub(cut);
3675 let kept = text.get(..cut).unwrap_or("");
3676 format!("{kept}… ({truncated_bytes} bytes truncated)")
3677}
3678
3679fn degraded_query_batch_outcome(
3683 selectors: &[LeanWorkerModuleQuerySelector],
3684 resource: LeanWorkerResourceExhaustedFacts,
3685) -> LeanWorkerModuleQueryBatchOutcome {
3686 let items = selectors
3687 .iter()
3688 .map(|selector| LeanWorkerModuleQueryBatchItem::BudgetExceeded {
3689 id: selector.id().to_owned(),
3690 message: "worker aborted during module query; result degraded under resource pressure".to_owned(),
3691 })
3692 .collect();
3693 LeanWorkerModuleQueryBatchOutcome::Ok {
3694 result: LeanWorkerModuleQueryBatchEnvelope {
3695 items,
3696 total_truncated: false,
3697 },
3698 imports: Vec::new(),
3699 facts: LeanWorkerModuleQueryCacheFacts {
3700 resource: Some(Box::new(resource)),
3701 ..LeanWorkerModuleQueryCacheFacts::uncached(0)
3702 },
3703 }
3704}
3705
3706fn degraded_declaration_verification_batch_result(
3707 request: &LeanWorkerDeclarationVerificationBatchRequest,
3708) -> LeanWorkerDeclarationVerificationBatchResult {
3709 LeanWorkerDeclarationVerificationBatchResult::Ok {
3710 results: request
3711 .targets
3712 .iter()
3713 .map(|target| LeanWorkerDeclarationVerificationBatchRow {
3714 id: target.id.clone(),
3715 target: target.target.clone(),
3716 verification_status: LeanWorkerDeclarationVerificationStatus::BudgetExceeded,
3717 facts: Box::new(LeanWorkerDeclarationVerificationFacts::unavailable()),
3718 })
3719 .collect(),
3720 imports: Vec::new(),
3721 }
3722}
3723
3724fn unexpected_response(operation: &'static str, response: &Response) -> LeanWorkerError {
3725 LeanWorkerError::Protocol {
3726 message: format!("worker sent unexpected {operation} response: {response:?}"),
3727 }
3728}
3729
3730fn stale_worker_output_error(
3731 operation: &'static str,
3732 expected: WorkerGeneration,
3733 actual: WorkerGeneration,
3734) -> LeanWorkerError {
3735 LeanWorkerError::Protocol {
3736 message: format!(
3737 "worker sent stale {operation} frame from generation {}, current generation is {}",
3738 actual.0, expected.0
3739 ),
3740 }
3741}
3742
3743fn stale_worker_request_output_error(
3744 operation: &'static str,
3745 expected: Option<WorkerRequestId>,
3746 actual: WorkerRequestId,
3747) -> LeanWorkerError {
3748 let expected = expected.map_or_else(|| "none".to_owned(), |request_id| request_id.0.to_string());
3749 LeanWorkerError::Protocol {
3750 message: format!(
3751 "worker sent stale {operation} frame from request {}, current request is {expected}",
3752 actual.0
3753 ),
3754 }
3755}
3756
3757fn path_string(path: &Path) -> String {
3758 path.to_string_lossy().into_owned()
3759}
3760
3761fn fixture_capability_manifest(fixture_root: &Path) -> Result<PathBuf, LeanWorkerError> {
3762 let built = lean_toolchain::CargoLeanCapability::new(fixture_root, "LeanRsFixture")
3763 .package("lean_rs_fixture")
3764 .module("LeanRsFixture")
3765 .export_signature(fixture_mul_signature("lean_rs_fixture_u64_mul"))
3766 .export_signature(fixture_panic_signature("lean_rs_fixture_panic_unit"))
3767 .build_quiet()
3768 .map_err(|diagnostic| LeanWorkerError::CapabilityBuild { diagnostic })?;
3769 Ok(built.manifest_path().to_path_buf())
3770}
3771
3772#[derive(Debug, Default)]
3780struct DeclarationNameCollector {
3781 names: Mutex<Vec<String>>,
3782 decode_error: Mutex<Option<String>>,
3783}
3784
3785impl DeclarationNameCollector {
3786 fn into_inner(self) -> Vec<String> {
3787 self.names.into_inner().unwrap_or_default()
3788 }
3789}
3790
3791impl LeanWorkerRawDataSink for DeclarationNameCollector {
3792 fn report(&self, row: LeanWorkerRawDataRow) {
3793 match serde_json::from_str::<String>(row.payload.get()) {
3794 Ok(name) => {
3795 if let Ok(mut guard) = self.names.lock() {
3796 guard.push(name);
3797 }
3798 }
3799 Err(err) => {
3800 if let Ok(mut slot) = self.decode_error.lock()
3801 && slot.is_none()
3802 {
3803 *slot = Some(format!("list_declarations_strings row payload decode failed: {err}"));
3804 }
3805 }
3806 }
3807 }
3808}
3809
3810#[cfg(target_os = "linux")]
3811fn child_rss_kib(pid: u32) -> Option<u64> {
3812 let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
3813 status.lines().find_map(|line| {
3814 let rest = line.strip_prefix("VmRSS:")?;
3815 rest.split_whitespace().next()?.parse::<u64>().ok()
3816 })
3817}
3818
3819#[cfg(not(target_os = "linux"))]
3820fn child_rss_kib(pid: u32) -> Option<u64> {
3821 let output = Command::new("ps")
3822 .args(["-o", "rss=", "-p", &pid.to_string()])
3823 .output()
3824 .ok()?;
3825 if !output.status.success() {
3826 return None;
3827 }
3828 let text = String::from_utf8_lossy(&output.stdout);
3829 text.trim().parse::<u64>().ok().filter(|value| *value > 0)
3830}
3831
3832#[cfg(test)]
3833#[allow(clippy::expect_used, clippy::panic, clippy::wildcard_enum_match_arm)]
3834mod tests {
3835 use super::{
3836 DISPLAY_DIAGNOSTICS_MAX_BYTES, LeanWorkerConfig, LeanWorkerDeclarationVerificationBatchResult,
3837 LeanWorkerDeclarationVerificationFacts, LeanWorkerDeclarationVerificationStatus, LeanWorkerError,
3838 LeanWorkerExit, LeanWorkerLifecycleSnapshot, LeanWorkerModuleQueryBatchItem, LeanWorkerModuleQueryBatchOutcome,
3839 LeanWorkerModuleQuerySelector, LeanWorkerRestartPolicy, LeanWorkerRestartReason, LeanWorkerStats,
3840 MAX_FRAME_BYTES, MAX_FRAME_BYTES_HARD_CAP, MIN_FRAME_BYTES, WorkerGeneration, WorkerRequestId,
3841 stale_worker_output_error, stale_worker_request_output_error, truncate_for_display,
3842 };
3843 use lean_rs_worker_protocol::types::{
3844 LeanWorkerDeclarationVerificationBatchItem, LeanWorkerDeclarationVerificationBatchRequest,
3845 LeanWorkerDeclarationVerificationTarget, LeanWorkerOutputBudgets, LeanWorkerSorryPolicy,
3846 };
3847 use std::path::PathBuf;
3848 use std::time::Duration;
3849
3850 fn dummy_config() -> LeanWorkerConfig {
3851 LeanWorkerConfig::new(PathBuf::from("/nonexistent/lean-rs-worker-child"))
3852 }
3853
3854 fn exit_with(diagnostics: &str, success: bool) -> LeanWorkerExit {
3855 let (code, status) = if success {
3856 (0_i32, "exit status: 0".to_owned())
3857 } else {
3858 (1_i32, "exit status: 1".to_owned())
3859 };
3860 LeanWorkerExit {
3861 success,
3862 code: Some(code),
3863 status,
3864 diagnostics: diagnostics.to_owned(),
3865 }
3866 }
3867
3868 #[test]
3869 fn max_frame_bytes_default_matches_legacy_cap() {
3870 let config = dummy_config();
3871 assert_eq!(config.max_frame_bytes, MAX_FRAME_BYTES);
3872 }
3873
3874 #[test]
3875 fn max_frame_bytes_clamps_below_floor() {
3876 let config = dummy_config().max_frame_bytes(1024);
3877 assert_eq!(config.max_frame_bytes, MIN_FRAME_BYTES);
3878 }
3879
3880 #[test]
3881 fn max_frame_bytes_clamps_above_ceiling() {
3882 let config = dummy_config().max_frame_bytes(u32::MAX);
3883 assert_eq!(config.max_frame_bytes, MAX_FRAME_BYTES_HARD_CAP);
3884 }
3885
3886 #[test]
3887 fn max_frame_bytes_passes_through_in_range() {
3888 let config = dummy_config().max_frame_bytes(8 * 1024 * 1024);
3889 assert_eq!(config.max_frame_bytes, 8 * 1024 * 1024);
3890 }
3891
3892 #[test]
3893 fn rss_hard_limit_config_clamps_to_nonzero_policy() {
3894 let config = dummy_config().rss_hard_limit(0, Duration::ZERO);
3895 assert_eq!(config.rss_hard_limit_kib, Some(1));
3896 assert_eq!(config.rss_sample_interval, Duration::from_millis(1));
3897 }
3898
3899 #[test]
3900 fn restart_intensity_policy_clamps_to_nonzero_window() {
3901 let policy = LeanWorkerRestartPolicy::default().max_restarts_per_window(0, Duration::ZERO);
3902 assert_eq!(policy.restart_intensity.max_restarts, 1);
3903 assert_eq!(policy.restart_intensity.window, Duration::from_millis(1));
3904 }
3905
3906 #[test]
3907 fn conformance_stale_generation_output_is_protocol_failure() {
3908 let err = stale_worker_output_error("health", WorkerGeneration(2), WorkerGeneration(1));
3909 match err {
3910 LeanWorkerError::Protocol { message } => {
3911 assert!(message.contains("stale health frame"));
3912 assert!(message.contains("generation 1"));
3913 assert!(message.contains("current generation is 2"));
3914 }
3915 other => panic!("expected protocol error, got {other:?}"),
3916 }
3917 }
3918
3919 #[test]
3920 fn conformance_stale_request_output_is_protocol_failure() {
3921 let err = stale_worker_request_output_error("emit_test_rows", Some(WorkerRequestId(2)), WorkerRequestId(1));
3922 match err {
3923 LeanWorkerError::Protocol { message } => {
3924 assert!(message.contains("stale emit_test_rows frame"));
3925 assert!(message.contains("request 1"));
3926 assert!(message.contains("current request is 2"));
3927 }
3928 other => panic!("expected protocol error, got {other:?}"),
3929 }
3930 }
3931
3932 #[test]
3933 fn lifecycle_snapshot_exposes_restart_generation() {
3934 let stats = LeanWorkerStats {
3935 restarts: 3,
3936 exits: 2,
3937 last_restart_reason: Some(LeanWorkerRestartReason::RequestTimeout {
3938 operation: "test",
3939 duration: std::time::Duration::from_secs(1),
3940 }),
3941 last_rss_kib: Some(42),
3942 rss_samples_unavailable: 1,
3943 ..LeanWorkerStats::default()
3944 };
3945 let exit = exit_with("bye", false);
3946 let snapshot = LeanWorkerLifecycleSnapshot::from_worker(&stats, Some(exit.clone()));
3947 assert_eq!(snapshot.worker_generation, 3);
3948 assert_eq!(snapshot.restarts, 3);
3949 assert_eq!(snapshot.exits, 2);
3950 assert_eq!(snapshot.last_exit, Some(exit));
3951 assert_eq!(snapshot.last_rss_kib, Some(42));
3952 assert_eq!(snapshot.rss_samples_unavailable, 1);
3953 }
3954
3955 #[test]
3956 fn restart_reason_exposes_stable_policy_cause() {
3957 assert_eq!(LeanWorkerRestartReason::Explicit.stable_cause(), "explicit");
3958 assert_eq!(
3959 LeanWorkerRestartReason::MaxRequests { limit: 1 }.stable_cause(),
3960 "max_requests"
3961 );
3962 assert_eq!(
3963 LeanWorkerRestartReason::MaxImports { limit: 1 }.stable_cause(),
3964 "max_imports"
3965 );
3966 assert_eq!(
3967 LeanWorkerRestartReason::RssCeiling {
3968 current_kib: 2,
3969 limit_kib: 1,
3970 last_import_stats: None,
3971 }
3972 .stable_cause(),
3973 "rss_ceiling"
3974 );
3975 assert_eq!(
3976 LeanWorkerRestartReason::RssHardLimit {
3977 operation: "test",
3978 current_kib: 2,
3979 limit_kib: 1,
3980 last_import_stats: None,
3981 }
3982 .stable_cause(),
3983 "rss_hard_limit"
3984 );
3985 assert_eq!(
3986 LeanWorkerRestartReason::RequestTimeout {
3987 operation: "test",
3988 duration: Duration::from_millis(1),
3989 }
3990 .stable_cause(),
3991 "timeout"
3992 );
3993 assert_eq!(
3996 LeanWorkerRestartReason::ChildAbort { operation: "test" }.stable_cause(),
3997 "child_abort"
3998 );
3999 }
4000
4001 #[test]
4002 fn degraded_query_batch_outcome_marks_every_selector_budget_exceeded() {
4003 let selectors = vec![
4004 LeanWorkerModuleQuerySelector::ProofState {
4005 id: "a".to_owned(),
4006 line: 1,
4007 column: 1,
4008 },
4009 LeanWorkerModuleQuerySelector::Diagnostics { id: "b".to_owned() },
4010 ];
4011 let resource = super::resource_facts(
4012 "worker_child_abort",
4013 true,
4014 Some("worker_process_module_query_batch"),
4015 None,
4016 None,
4017 Some(1),
4018 Some(2),
4019 Some("child_abort".to_owned()),
4020 None,
4021 None,
4022 None,
4023 None,
4024 None,
4025 None,
4026 None,
4027 None,
4028 );
4029 let LeanWorkerModuleQueryBatchOutcome::Ok { result, imports, facts } =
4030 super::degraded_query_batch_outcome(&selectors, resource.clone())
4031 else {
4032 panic!("degraded outcome should be Ok with per-selector items");
4033 };
4034 assert!(imports.is_empty());
4035 assert_eq!(facts.resource.as_deref(), Some(&resource));
4036 assert_eq!(result.items.len(), 2);
4037 let ids: Vec<&str> = result
4040 .items
4041 .iter()
4042 .filter_map(|item| {
4043 if let LeanWorkerModuleQueryBatchItem::BudgetExceeded { id, .. } = item {
4044 Some(id.as_str())
4045 } else {
4046 None
4047 }
4048 })
4049 .collect();
4050 assert_eq!(ids, vec!["a", "b"]);
4051 }
4052
4053 #[test]
4054 fn declaration_outline_degraded_query_batch_outcome_uses_selector_id() {
4055 let selectors = vec![LeanWorkerModuleQuerySelector::DeclarationOutline {
4056 id: "outline".to_owned(),
4057 }];
4058 let resource = super::resource_facts(
4059 "worker_child_abort",
4060 true,
4061 Some("worker_process_module_query_batch"),
4062 None,
4063 None,
4064 Some(1),
4065 Some(2),
4066 Some("child_abort".to_owned()),
4067 None,
4068 None,
4069 None,
4070 None,
4071 None,
4072 None,
4073 None,
4074 None,
4075 );
4076 let LeanWorkerModuleQueryBatchOutcome::Ok { result, facts, .. } =
4077 super::degraded_query_batch_outcome(&selectors, resource.clone())
4078 else {
4079 panic!("degraded outcome should be Ok with per-selector items");
4080 };
4081 assert_eq!(facts.resource.as_deref(), Some(&resource));
4082 assert!(matches!(
4083 result.items.as_slice(),
4084 [LeanWorkerModuleQueryBatchItem::BudgetExceeded { id, .. }] if id == "outline"
4085 ));
4086 }
4087
4088 #[test]
4089 fn command_message_degraded_query_batch_outcome_uses_diagnostics_selector_id() {
4090 let selectors = vec![LeanWorkerModuleQuerySelector::Diagnostics {
4091 id: "messages".to_owned(),
4092 }];
4093 let resource = super::resource_facts(
4094 "worker_child_abort",
4095 true,
4096 Some("worker_process_module_query_batch"),
4097 None,
4098 None,
4099 Some(1),
4100 Some(2),
4101 Some("child_abort".to_owned()),
4102 None,
4103 None,
4104 None,
4105 None,
4106 None,
4107 None,
4108 None,
4109 None,
4110 );
4111 let LeanWorkerModuleQueryBatchOutcome::Ok { result, facts, .. } =
4112 super::degraded_query_batch_outcome(&selectors, resource.clone())
4113 else {
4114 panic!("degraded outcome should be Ok with per-selector items");
4115 };
4116 assert_eq!(facts.resource.as_deref(), Some(&resource));
4117 assert!(matches!(
4118 result.items.as_slice(),
4119 [LeanWorkerModuleQueryBatchItem::BudgetExceeded { id, .. }] if id == "messages"
4120 ));
4121 }
4122
4123 #[test]
4124 fn declaration_verification_batch_degraded_result_preserves_target_order() {
4125 let request = LeanWorkerDeclarationVerificationBatchRequest {
4126 source: "theorem a : True := by trivial\n".to_owned(),
4127 targets: vec![
4128 LeanWorkerDeclarationVerificationBatchItem {
4129 id: "first".to_owned(),
4130 target: LeanWorkerDeclarationVerificationTarget::Name { name: "a".to_owned() },
4131 },
4132 LeanWorkerDeclarationVerificationBatchItem {
4133 id: "second".to_owned(),
4134 target: LeanWorkerDeclarationVerificationTarget::Name { name: "b".to_owned() },
4135 },
4136 ],
4137 sorry_policy: LeanWorkerSorryPolicy::Deny,
4138 report_axioms: true,
4139 budgets: LeanWorkerOutputBudgets::default(),
4140 };
4141 let LeanWorkerDeclarationVerificationBatchResult::Ok { results, imports } =
4142 super::degraded_declaration_verification_batch_result(&request)
4143 else {
4144 panic!("degraded batch verification should be an Ok batch");
4145 };
4146 assert!(imports.is_empty());
4147 let ids: Vec<&str> = results.iter().map(|row| row.id.as_str()).collect();
4148 assert_eq!(ids, vec!["first", "second"]);
4149 assert!(results.iter().all(|row| {
4150 row.verification_status == LeanWorkerDeclarationVerificationStatus::BudgetExceeded
4151 && !row.facts.axioms_available
4152 && row.facts.axioms.is_empty()
4153 }));
4154 }
4155
4156 #[test]
4157 fn unavailable_verification_facts_report_axioms_uncomputed() {
4158 let facts = LeanWorkerDeclarationVerificationFacts::unavailable();
4159 assert!(
4160 !facts.axioms_available,
4161 "degraded facts must not claim a computed axiom set"
4162 );
4163 assert!(facts.axioms.is_empty());
4164 assert!(facts.target.is_none());
4165 }
4166
4167 #[test]
4168 fn display_child_panic_or_abort_includes_stderr_tail() {
4169 let exit = exit_with("could not dlopen X.dylib: image not found", false);
4170 let err = LeanWorkerError::ChildPanicOrAbort { exit };
4171 let rendered = err.to_string();
4172 assert!(rendered.contains("exit status"), "{rendered}");
4173 assert!(
4174 rendered.contains("could not dlopen X.dylib: image not found"),
4175 "{rendered}"
4176 );
4177 assert!(rendered.starts_with("worker exited fatally with "), "{rendered}");
4178 }
4179
4180 #[test]
4181 fn display_child_exited_includes_stderr_tail() {
4182 let exit = exit_with("warning: lean-rs-worker exiting cleanly\n", true);
4183 let err = LeanWorkerError::ChildExited { exit };
4184 let rendered = err.to_string();
4185 assert!(rendered.starts_with("worker exited with "), "{rendered}");
4186 assert!(
4187 rendered.contains("warning: lean-rs-worker exiting cleanly"),
4188 "{rendered}"
4189 );
4190 }
4191
4192 #[test]
4193 fn display_keeps_terse_format_when_diagnostics_empty() {
4194 let exit = exit_with("", false);
4195 let err = LeanWorkerError::ChildPanicOrAbort { exit };
4196 assert_eq!(err.to_string(), "worker exited fatally with exit status: 1");
4197
4198 let exit = exit_with(" \n\t ", true);
4199 let err = LeanWorkerError::ChildExited { exit };
4200 assert_eq!(err.to_string(), "worker exited with exit status: 0");
4201 }
4202
4203 #[test]
4204 fn display_truncates_oversized_diagnostics_with_annotation() {
4205 let large: String = "x".repeat(DISPLAY_DIAGNOSTICS_MAX_BYTES * 2);
4206 let exit = exit_with(&large, false);
4207 let err = LeanWorkerError::ChildPanicOrAbort { exit };
4208 let rendered = err.to_string();
4209 assert!(rendered.contains("bytes truncated"), "{rendered}");
4210 assert!(
4211 rendered.len() < large.len() + 128,
4212 "rendered length {} unexpectedly large for original {}",
4213 rendered.len(),
4214 large.len()
4215 );
4216 }
4217
4218 #[test]
4219 fn truncate_for_display_returns_input_when_under_cap() {
4220 let s = "short message";
4221 assert_eq!(truncate_for_display(s, 1024), s);
4222 }
4223
4224 #[test]
4225 fn truncate_for_display_cuts_at_char_boundary() {
4226 let s = "ééééé";
4229 let out = truncate_for_display(s, 5);
4230 let before_marker = out.split('…').next().unwrap_or("");
4231 assert!(before_marker.is_char_boundary(before_marker.len()));
4232 assert!(out.contains("bytes truncated"), "{out}");
4233 }
4234
4235 #[test]
4236 fn truncate_for_display_does_not_split_ansi_csi() {
4237 let mut s = String::from("hello ");
4240 s.push('\x1b');
4241 s.push('[');
4242 s.push('3');
4243 s.push('1');
4244 s.push('m');
4245 s.push_str("RED");
4246 let cap = s.find('1').expect("test fixture invariant: '1' present");
4248 let out = truncate_for_display(&s, cap);
4249 let before_marker = out.split('…').next().unwrap_or("");
4251 assert!(
4252 !before_marker.contains('\x1b'),
4253 "truncated prefix still contains ESC: {before_marker:?}"
4254 );
4255 assert!(out.contains("bytes truncated"), "{out}");
4256 }
4257
4258 #[test]
4259 fn truncate_for_display_keeps_terminated_ansi_csi() {
4260 let mut s = String::from("\x1b[31mRED\x1b[0m ");
4262 s.push_str(&"x".repeat(64));
4263 let out = truncate_for_display(&s, 20);
4264 assert!(out.contains("\x1b[31m"), "{out}");
4265 assert!(out.contains("bytes truncated"), "{out}");
4266 }
4267}