use std::ffi::OsString;
use std::fmt;
use std::io::{BufReader, BufWriter, Read as _};
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
use crate::capability::LeanWorkerBootstrapDiagnosticCode;
use crate::protocol::{Message, Request, Response, read_frame, write_frame};
use crate::session::LeanWorkerDataSinkTarget;
use crate::session::{
LeanWorkerCancellationToken, LeanWorkerCapabilityMetadata, LeanWorkerDataSink, LeanWorkerDiagnosticSink,
LeanWorkerDoctorReport, LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerKernelResult,
LeanWorkerProgressSink, LeanWorkerRawDataSink, LeanWorkerRuntimeMetadata, LeanWorkerSessionConfig,
LeanWorkerStreamSummary, check_cancelled, elapsed_event, report_parent_data_row, report_parent_diagnostic,
report_parent_progress,
};
const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
const WORKER_EVENT_BUFFER_CAPACITY: usize = 64;
pub const LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(30);
pub const LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING: Duration = Duration::from_mins(10);
#[derive(Clone, Debug)]
pub struct LeanWorkerConfig {
executable: PathBuf,
current_dir: Option<PathBuf>,
env: Vec<(OsString, OsString)>,
startup_timeout: Duration,
request_timeout: Duration,
restart_policy: LeanWorkerRestartPolicy,
}
impl LeanWorkerConfig {
pub fn new(executable: impl Into<PathBuf>) -> Self {
Self {
executable: executable.into(),
current_dir: None,
env: Vec::new(),
startup_timeout: DEFAULT_STARTUP_TIMEOUT,
request_timeout: LEAN_WORKER_REQUEST_TIMEOUT_DEFAULT,
restart_policy: LeanWorkerRestartPolicy::default(),
}
}
pub fn executable(&self) -> &Path {
&self.executable
}
#[must_use]
pub fn current_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.current_dir = Some(path.into());
self
}
#[must_use]
pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
self.env.push((key.into(), value.into()));
self
}
#[must_use]
pub fn startup_timeout(mut self, timeout: Duration) -> Self {
self.startup_timeout = timeout;
self
}
#[must_use]
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
#[must_use]
pub fn long_running_requests(mut self) -> Self {
self.request_timeout = LEAN_WORKER_REQUEST_TIMEOUT_LONG_RUNNING;
self
}
#[must_use]
pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
self.restart_policy = policy;
self
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct LeanWorkerRestartPolicy {
max_requests: Option<u64>,
max_imports: Option<u64>,
max_rss_kib: Option<u64>,
idle_restart_after: Option<Duration>,
}
impl LeanWorkerRestartPolicy {
#[must_use]
pub fn disabled() -> Self {
Self::default()
}
#[must_use]
pub fn max_requests(mut self, limit: u64) -> Self {
self.max_requests = Some(limit.max(1));
self
}
#[must_use]
pub fn max_imports(mut self, limit: u64) -> Self {
self.max_imports = Some(limit.max(1));
self
}
#[must_use]
pub fn max_rss_kib(mut self, limit: u64) -> Self {
self.max_rss_kib = Some(limit.max(1));
self
}
#[must_use]
pub fn idle_restart_after(mut self, duration: Duration) -> Self {
self.idle_restart_after = Some(duration);
self
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum LeanWorkerRestartReason {
Explicit,
MaxRequests { limit: u64 },
MaxImports { limit: u64 },
RssCeiling { current_kib: u64, limit_kib: u64 },
Idle { idle_for: Duration, limit: Duration },
Cancelled { operation: &'static str },
RequestTimeout {
operation: &'static str,
duration: Duration,
},
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct LeanWorkerStats {
pub requests: u64,
pub imports: u64,
pub exits: u64,
pub restarts: u64,
pub explicit_cycles: u64,
pub max_request_restarts: u64,
pub max_import_restarts: u64,
pub rss_restarts: u64,
pub idle_restarts: u64,
pub cancelled_restarts: u64,
pub timeout_restarts: u64,
pub rss_samples_unavailable: u64,
pub last_rss_kib: Option<u64>,
pub last_restart_reason: Option<LeanWorkerRestartReason>,
pub stream_requests: u64,
pub stream_successes: u64,
pub stream_failures: u64,
pub data_rows_delivered: u64,
pub data_row_payload_bytes: u64,
pub stream_elapsed: Duration,
pub backpressure_waits: u64,
pub backpressure_failures: u64,
}
impl LeanWorkerStats {
fn record_restart(&mut self, reason: LeanWorkerRestartReason) {
self.restarts = self.restarts.saturating_add(1);
match &reason {
LeanWorkerRestartReason::Explicit => {
self.explicit_cycles = self.explicit_cycles.saturating_add(1);
}
LeanWorkerRestartReason::MaxRequests { .. } => {
self.max_request_restarts = self.max_request_restarts.saturating_add(1);
}
LeanWorkerRestartReason::MaxImports { .. } => {
self.max_import_restarts = self.max_import_restarts.saturating_add(1);
}
LeanWorkerRestartReason::RssCeiling { .. } => {
self.rss_restarts = self.rss_restarts.saturating_add(1);
}
LeanWorkerRestartReason::Idle { .. } => {
self.idle_restarts = self.idle_restarts.saturating_add(1);
}
LeanWorkerRestartReason::Cancelled { .. } => {
self.cancelled_restarts = self.cancelled_restarts.saturating_add(1);
}
LeanWorkerRestartReason::RequestTimeout { .. } => {
self.timeout_restarts = self.timeout_restarts.saturating_add(1);
}
}
self.last_restart_reason = Some(reason);
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum LeanWorkerStatus {
Running,
Exited(LeanWorkerExit),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerExit {
pub success: bool,
pub code: Option<i32>,
pub status: String,
pub diagnostics: String,
}
impl LeanWorkerExit {
fn from_status(status: ExitStatus, diagnostics: String) -> Self {
Self {
success: status.success(),
code: status.code(),
status: status.to_string(),
diagnostics,
}
}
}
#[derive(Debug)]
pub enum LeanWorkerError {
Spawn {
executable: PathBuf,
source: std::io::Error,
},
WorkerChildUnresolved {
tried: Vec<PathBuf>,
},
WorkerChildNotExecutable { path: PathBuf, reason: String },
Bootstrap {
code: LeanWorkerBootstrapDiagnosticCode,
message: String,
},
CapabilityBuild {
diagnostic: lean_toolchain::LinkDiagnostics,
},
Setup { message: String },
Handshake { message: String },
Protocol { message: String },
Worker { code: String, message: String },
ChildExited { exit: LeanWorkerExit },
ChildPanicOrAbort { exit: LeanWorkerExit },
Timeout {
operation: &'static str,
duration: Duration,
},
Cancelled { operation: &'static str },
ProgressPanic { message: String },
DataSinkPanic { message: String },
DiagnosticSinkPanic { message: String },
StreamExportFailed { status: u8 },
StreamCallbackFailed { status: u8, description: String },
StreamRowMalformed { message: String },
CapabilityMetadataMalformed { message: String },
CapabilityMetadataMismatch {
export: String,
expected: Box<LeanWorkerCapabilityMetadata>,
actual: Box<LeanWorkerCapabilityMetadata>,
},
CapabilityDoctorMalformed { message: String },
TypedCommandRequestEncode { export: String, message: String },
TypedCommandResponseDecode { export: String, message: String },
TypedCommandRowDecode {
export: String,
stream: String,
sequence: u64,
message: String,
},
TypedCommandSummaryDecode { export: String, message: String },
LeaseInvalidated { reason: String },
WorkerPoolExhausted { max_workers: usize },
WorkerPoolMemoryBudgetExceeded { current_kib: u64, limit_kib: u64 },
WorkerPoolQueueTimeout { waited: Duration },
UnsupportedRequest { operation: &'static str },
Wait { source: std::io::Error },
}
impl fmt::Display for LeanWorkerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Spawn { executable, source } => {
write!(f, "failed to spawn worker {}: {source}", executable.display())
}
Self::WorkerChildUnresolved { tried } => {
let tried = tried
.iter()
.map(|path| path.display().to_string())
.collect::<Vec<_>>()
.join(", ");
write!(
f,
"could not resolve lean-rs-worker-child; set LEAN_RS_WORKER_CHILD or place it beside the current executable (tried: {tried})"
)
}
Self::WorkerChildNotExecutable { path, reason } => {
write!(f, "worker child '{}' is not executable: {reason}", path.display())
}
Self::Bootstrap { code, message } => {
write!(f, "worker bootstrap check {code} failed: {message}")
}
Self::CapabilityBuild { diagnostic } => {
write!(f, "worker capability Lake target build failed: {diagnostic}")
}
Self::Setup { message } => write!(f, "worker child setup failed: {message}"),
Self::Handshake { message } => write!(f, "worker handshake failed: {message}"),
Self::Protocol { message } => write!(f, "worker protocol failed: {message}"),
Self::Worker { code, message } => write!(f, "worker returned {code}: {message}"),
Self::ChildExited { exit } => write!(f, "worker exited with {}", exit.status),
Self::ChildPanicOrAbort { exit } => {
write!(f, "worker exited fatally with {}", exit.status)
}
Self::Timeout { operation, duration } => {
write!(f, "worker operation {operation} timed out after {duration:?}")
}
Self::Cancelled { operation } => write!(f, "worker operation {operation} was cancelled"),
Self::ProgressPanic { message } => write!(f, "worker progress sink panicked: {message}"),
Self::DataSinkPanic { message } => write!(f, "worker data sink panicked: {message}"),
Self::DiagnosticSinkPanic { message } => {
write!(f, "worker diagnostic sink panicked: {message}")
}
Self::StreamExportFailed { status } => write!(f, "streaming export returned status {status}"),
Self::StreamCallbackFailed { status, description } => {
write!(f, "streaming callback failed with status {status}: {description}")
}
Self::StreamRowMalformed { message } => write!(f, "streaming export emitted malformed row: {message}"),
Self::CapabilityMetadataMalformed { message } => {
write!(f, "capability metadata export returned malformed JSON: {message}")
}
Self::CapabilityMetadataMismatch { export, .. } => {
write!(f, "capability metadata from {export} did not match expectation")
}
Self::CapabilityDoctorMalformed { message } => {
write!(f, "capability doctor export returned malformed JSON: {message}")
}
Self::TypedCommandRequestEncode { export, message } => {
write!(f, "typed worker command {export} request JSON encode failed: {message}")
}
Self::TypedCommandResponseDecode { export, message } => {
write!(
f,
"typed worker command {export} response JSON decode failed: {message}"
)
}
Self::TypedCommandRowDecode {
export,
stream,
sequence,
message,
} => {
write!(
f,
"typed worker command {export} row decode failed at stream {stream} sequence {sequence}: {message}"
)
}
Self::TypedCommandSummaryDecode { export, message } => {
write!(
f,
"typed worker command {export} terminal summary decode failed: {message}"
)
}
Self::LeaseInvalidated { reason } => write!(f, "worker pool lease was invalidated: {reason}"),
Self::WorkerPoolExhausted { max_workers } => {
write!(
f,
"worker pool cannot admit another session key; max_workers={max_workers}"
)
}
Self::WorkerPoolMemoryBudgetExceeded { current_kib, limit_kib } => {
write!(
f,
"worker pool cannot admit work within RSS budget; current_kib={current_kib} limit_kib={limit_kib}"
)
}
Self::WorkerPoolQueueTimeout { waited } => {
write!(f, "worker pool admission timed out after {waited:?}")
}
Self::UnsupportedRequest { operation } => {
write!(f, "worker operation {operation} is not supported")
}
Self::Wait { source } => write!(f, "failed to wait for worker child: {source}"),
}
}
}
impl std::error::Error for LeanWorkerError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Spawn { source, .. } | Self::Wait { source } => Some(source),
Self::CapabilityBuild { diagnostic } => Some(diagnostic),
Self::WorkerChildUnresolved { .. } | Self::WorkerChildNotExecutable { .. } | Self::Bootstrap { .. } => None,
Self::Setup { .. }
| Self::Handshake { .. }
| Self::Protocol { .. }
| Self::Worker { .. }
| Self::ChildExited { .. }
| Self::ChildPanicOrAbort { .. }
| Self::Timeout { .. }
| Self::Cancelled { .. }
| Self::ProgressPanic { .. }
| Self::DataSinkPanic { .. }
| Self::DiagnosticSinkPanic { .. }
| Self::StreamExportFailed { .. }
| Self::StreamCallbackFailed { .. }
| Self::StreamRowMalformed { .. }
| Self::CapabilityMetadataMalformed { .. }
| Self::CapabilityMetadataMismatch { .. }
| Self::CapabilityDoctorMalformed { .. }
| Self::TypedCommandRequestEncode { .. }
| Self::TypedCommandResponseDecode { .. }
| Self::TypedCommandRowDecode { .. }
| Self::TypedCommandSummaryDecode { .. }
| Self::LeaseInvalidated { .. }
| Self::WorkerPoolExhausted { .. }
| Self::WorkerPoolMemoryBudgetExceeded { .. }
| Self::WorkerPoolQueueTimeout { .. }
| Self::UnsupportedRequest { .. } => None,
}
}
}
#[derive(Debug)]
pub struct LeanWorker {
config: LeanWorkerConfig,
child: Option<Child>,
stdin: Option<BufWriter<ChildStdin>>,
stdout: Option<BufReader<ChildStdout>>,
stderr: Option<ChildStderr>,
last_exit: Option<LeanWorkerExit>,
runtime_metadata: LeanWorkerRuntimeMetadata,
stats: LeanWorkerStats,
requests_since_restart: u64,
imports_since_restart: u64,
last_activity: Instant,
}
impl LeanWorker {
pub fn spawn(config: &LeanWorkerConfig) -> Result<Self, LeanWorkerError> {
let mut command = Command::new(&config.executable);
command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("LEAN_ABORT_ON_PANIC", "1")
.env("RUST_BACKTRACE", "0");
if let Some(current_dir) = &config.current_dir {
command.current_dir(current_dir);
}
for (key, value) in &config.env {
command.env(key, value);
}
let mut child = command.spawn().map_err(|source| LeanWorkerError::Spawn {
executable: config.executable.clone(),
source,
})?;
let stdin = child
.stdin
.take()
.map(BufWriter::new)
.ok_or_else(|| LeanWorkerError::Setup {
message: "child stdin unavailable".to_owned(),
})?;
let stdout = child.stdout.take().ok_or_else(|| LeanWorkerError::Setup {
message: "child stdout unavailable".to_owned(),
})?;
let stderr = child.stderr.take();
let (sender, receiver) = mpsc::channel();
let _handshake_reader = thread::spawn(move || {
let mut stdout = BufReader::new(stdout);
let result = expect_handshake(&mut stdout);
drop(sender.send((stdout, result)));
});
let (stdout, runtime_metadata) = match receiver.recv_timeout(config.startup_timeout) {
Ok((stdout, Ok(metadata))) => (stdout, metadata),
Ok((_stdout, Err(err))) => {
let mut worker = Self {
config: config.clone(),
child: Some(child),
stdin: Some(stdin),
stdout: None,
stderr,
last_exit: None,
runtime_metadata: LeanWorkerRuntimeMetadata {
worker_version: String::new(),
protocol_version: crate::protocol::PROTOCOL_VERSION,
lean_version: None,
},
stats: LeanWorkerStats::default(),
requests_since_restart: 0,
imports_since_restart: 0,
last_activity: Instant::now(),
};
let exit = worker.try_record_exit();
return Err(match exit {
Some(exit) if !exit.success => LeanWorkerError::ChildPanicOrAbort { exit },
Some(exit) => LeanWorkerError::ChildExited { exit },
None => err,
});
}
Err(mpsc::RecvTimeoutError::Timeout) => {
drop(child.kill());
let _exit = wait_with_stderr(&mut child, stderr)?;
return Err(LeanWorkerError::Timeout {
operation: "startup",
duration: config.startup_timeout,
});
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
return Err(LeanWorkerError::Handshake {
message: "handshake reader exited without a result".to_owned(),
});
}
};
Ok(Self {
config: config.clone(),
child: Some(child),
stdin: Some(stdin),
stdout: Some(stdout),
stderr,
last_exit: None,
runtime_metadata,
stats: LeanWorkerStats::default(),
requests_since_restart: 0,
imports_since_restart: 0,
last_activity: Instant::now(),
})
}
pub fn health(&mut self) -> Result<(), LeanWorkerError> {
self.prepare_request(false)?;
self.send_request(Request::Health)?;
self.record_request(false);
match self.read_response("health")? {
Response::HealthOk => Ok(()),
other @ (Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response("health", &other)),
}
}
pub fn load_fixture_capability(&mut self, fixture_root: impl AsRef<Path>) -> Result<(), LeanWorkerError> {
self.prepare_request(true)?;
self.send_request(Request::LoadFixtureCapability {
fixture_root: path_string(fixture_root.as_ref()),
})?;
self.record_request(true);
match self.read_response("load_fixture_capability")? {
Response::CapabilityLoaded => Ok(()),
other @ (Response::HealthOk
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response("load_fixture_capability", &other)),
}
}
pub fn call_fixture_mul(
&mut self,
fixture_root: impl AsRef<Path>,
lhs: u64,
rhs: u64,
) -> Result<u64, LeanWorkerError> {
self.prepare_request(true)?;
self.send_request(Request::CallFixtureMul {
fixture_root: path_string(fixture_root.as_ref()),
lhs,
rhs,
})?;
self.record_request(true);
match self.read_response("call_fixture_mul")? {
Response::U64 { value } => Ok(value),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response("call_fixture_mul", &other)),
}
}
pub fn status(&mut self) -> Result<LeanWorkerStatus, LeanWorkerError> {
if let Some(exit) = &self.last_exit {
return Ok(LeanWorkerStatus::Exited(exit.clone()));
}
let Some(child) = self.child.as_mut() else {
return Ok(LeanWorkerStatus::Exited(LeanWorkerExit {
success: false,
code: None,
status: "worker is not running".to_owned(),
diagnostics: String::new(),
}));
};
match child.try_wait().map_err(|source| LeanWorkerError::Wait { source })? {
Some(status) => {
let diagnostics = self.read_stderr();
let exit = LeanWorkerExit::from_status(status, diagnostics);
self.last_exit = Some(exit.clone());
self.child = None;
self.stdin = None;
self.stdout = None;
self.stats.exits = self.stats.exits.saturating_add(1);
Ok(LeanWorkerStatus::Exited(exit))
}
None => Ok(LeanWorkerStatus::Running),
}
}
#[must_use]
pub fn stats(&self) -> LeanWorkerStats {
self.stats.clone()
}
#[must_use]
pub fn runtime_metadata(&self) -> LeanWorkerRuntimeMetadata {
self.runtime_metadata.clone()
}
pub fn rss_kib(&mut self) -> Option<u64> {
match self.child_rss_kib() {
Some(value) => {
self.stats.last_rss_kib = Some(value);
Some(value)
}
None => {
self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
None
}
}
}
#[must_use]
pub fn request_timeout(&self) -> Duration {
self.config.request_timeout
}
pub fn set_request_timeout(&mut self, timeout: Duration) {
self.config.request_timeout = timeout;
}
pub fn cycle(&mut self) -> Result<(), LeanWorkerError> {
self.restart_with_reason(LeanWorkerRestartReason::Explicit)
}
pub(crate) fn cycle_with_restart_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
self.restart_with_reason(reason)
}
pub fn restart(&mut self) -> Result<(), LeanWorkerError> {
self.cycle()
}
#[doc(hidden)]
pub fn __kill_for_test(&mut self) -> Result<(), LeanWorkerError> {
let Some(child) = self.child.as_mut() else {
return Err(self.dead_error());
};
child.kill().map_err(|source| LeanWorkerError::Wait { source })?;
Ok(())
}
pub fn terminate(mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
self.send_request(Request::Terminate)?;
match self.read_response("terminate")? {
Response::Terminating => self.wait_for_exit(),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Error { .. }) => Err(unexpected_response("terminate", &other)),
}
}
#[doc(hidden)]
pub fn __trigger_lean_panic_fixture(
mut self,
fixture_root: impl AsRef<Path>,
) -> Result<LeanWorkerExit, LeanWorkerError> {
self.prepare_request(true)?;
self.send_request(Request::TriggerLeanPanic {
fixture_root: path_string(fixture_root.as_ref()),
})?;
self.record_request(true);
match self.read_response("trigger_lean_panic") {
Ok(response) => Err(unexpected_response("trigger_lean_panic", &response)),
Err(LeanWorkerError::ChildPanicOrAbort { exit }) => Ok(exit),
Err(err) => Err(err),
}
}
#[doc(hidden)]
pub fn __emit_test_rows(
&mut self,
streams: Vec<String>,
cancellation: Option<&LeanWorkerCancellationToken>,
data: Option<&dyn LeanWorkerDataSink>,
) -> Result<u64, LeanWorkerError> {
const OPERATION: &str = "emit_test_rows";
check_cancelled(OPERATION, cancellation)?;
self.prepare_request(false)?;
self.send_request(Request::EmitTestRows { streams })?;
self.record_request(false);
match self.read_response_with_events(
OPERATION,
None,
cancellation,
data.map(LeanWorkerDataSinkTarget::Value),
None,
)? {
Response::RowsComplete { count } => Ok(count),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
pub(crate) fn open_worker_session(
&mut self,
config: &LeanWorkerSessionConfig,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<(), LeanWorkerError> {
const OPERATION: &str = "open_worker_session";
check_cancelled(OPERATION, cancellation)?;
self.prepare_request(true)?;
self.send_request(Request::OpenHostSession {
project_root: config.project_root_string(),
package: config.package().to_owned(),
lib_name: config.lib_name().to_owned(),
imports: config.imports().to_vec(),
})?;
self.record_request(true);
match self.read_response_with_progress(OPERATION, progress, cancellation)? {
Response::HostSessionOpened => Ok(()),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
pub(crate) fn worker_elaborate(
&mut self,
source: &str,
options: &LeanWorkerElabOptions,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerElabResult, LeanWorkerError> {
const OPERATION: &str = "worker_elaborate";
check_cancelled(OPERATION, cancellation)?;
self.prepare_request(false)?;
self.send_request(Request::Elaborate {
source: source.to_owned(),
options: options.wire(),
})?;
self.record_request(false);
match self.read_response_with_progress(OPERATION, progress, cancellation)? {
Response::Elaboration { outcome } => Ok(outcome.into()),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
pub(crate) fn worker_kernel_check(
&mut self,
source: &str,
options: &LeanWorkerElabOptions,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
const OPERATION: &str = "worker_kernel_check";
check_cancelled(OPERATION, cancellation)?;
self.prepare_request(false)?;
self.send_request(Request::KernelCheck {
source: source.to_owned(),
options: options.wire(),
progress: progress.is_some(),
})?;
self.record_request(false);
match self.read_response_with_progress(OPERATION, progress, cancellation)? {
Response::KernelCheck { outcome } => Ok(outcome.into()),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
pub(crate) fn worker_declaration_kinds(
&mut self,
names: &[&str],
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<Vec<String>, LeanWorkerError> {
const OPERATION: &str = "worker_declaration_kinds";
check_cancelled(OPERATION, cancellation)?;
self.prepare_request(false)?;
self.send_request(Request::DeclarationKinds {
names: names.iter().map(|name| (*name).to_owned()).collect(),
progress: progress.is_some(),
})?;
self.record_request(false);
match self.read_response_with_progress(OPERATION, progress, cancellation)? {
Response::Strings { values } => Ok(values),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
pub(crate) fn worker_declaration_names(
&mut self,
names: &[&str],
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<Vec<String>, LeanWorkerError> {
const OPERATION: &str = "worker_declaration_names";
check_cancelled(OPERATION, cancellation)?;
self.prepare_request(false)?;
self.send_request(Request::DeclarationNames {
names: names.iter().map(|name| (*name).to_owned()).collect(),
progress: progress.is_some(),
})?;
self.record_request(false);
match self.read_response_with_progress(OPERATION, progress, cancellation)? {
Response::Strings { values } => Ok(values),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
pub(crate) fn worker_run_data_stream(
&mut self,
export: &str,
request: &serde_json::Value,
rows: &dyn LeanWorkerDataSink,
diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
self.worker_run_data_stream_with_sink(
export,
request,
LeanWorkerDataSinkTarget::Value(rows),
diagnostics,
cancellation,
progress,
)
}
pub(crate) fn worker_run_data_stream_raw(
&mut self,
export: &str,
request: &serde_json::Value,
rows: &dyn LeanWorkerRawDataSink,
diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
self.worker_run_data_stream_with_sink(
export,
request,
LeanWorkerDataSinkTarget::Raw(rows),
diagnostics,
cancellation,
progress,
)
}
fn worker_run_data_stream_with_sink(
&mut self,
export: &str,
request: &serde_json::Value,
rows: LeanWorkerDataSinkTarget<'_>,
diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
const OPERATION: &str = "worker_run_data_stream";
check_cancelled(OPERATION, cancellation)?;
let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
message: format!("worker data-stream request JSON encode failed: {err}"),
})?;
self.prepare_request(false)?;
self.send_request(Request::RunDataStream {
export: export.to_owned(),
request_json,
progress: progress.is_some(),
})?;
self.record_request(false);
self.stats.stream_requests = self.stats.stream_requests.saturating_add(1);
match self.read_response_with_events(OPERATION, progress, cancellation, Some(rows), diagnostics)? {
Response::StreamComplete { summary } => Ok(summary.into()),
Response::StreamExportFailed { status_byte } => {
Err(LeanWorkerError::StreamExportFailed { status: status_byte })
}
Response::StreamCallbackFailed {
status_byte,
description,
} => Err(LeanWorkerError::StreamCallbackFailed {
status: status_byte,
description,
}),
Response::StreamRowMalformed { message } => Err(LeanWorkerError::StreamRowMalformed { message }),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::RowsComplete { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
pub(crate) fn worker_capability_metadata(
&mut self,
export: &str,
request: &serde_json::Value,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
const OPERATION: &str = "worker_capability_metadata";
check_cancelled(OPERATION, cancellation)?;
let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
message: format!("worker capability metadata request JSON encode failed: {err}"),
})?;
self.prepare_request(false)?;
self.send_request(Request::CapabilityMetadata {
export: export.to_owned(),
request_json,
})?;
self.record_request(false);
match self.read_response_with_progress(OPERATION, progress, cancellation)? {
Response::CapabilityMetadata { metadata } => Ok(metadata.into()),
Response::CapabilityMetadataMalformed { message } => {
Err(LeanWorkerError::CapabilityMetadataMalformed { message })
}
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
pub(crate) fn worker_capability_doctor(
&mut self,
export: &str,
request: &serde_json::Value,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
const OPERATION: &str = "worker_capability_doctor";
check_cancelled(OPERATION, cancellation)?;
let request_json = serde_json::to_string(request).map_err(|err| LeanWorkerError::Protocol {
message: format!("worker capability doctor request JSON encode failed: {err}"),
})?;
self.prepare_request(false)?;
self.send_request(Request::CapabilityDoctor {
export: export.to_owned(),
request_json,
})?;
self.record_request(false);
match self.read_response_with_progress(OPERATION, progress, cancellation)? {
Response::CapabilityDoctor { report } => Ok(report.into()),
Response::CapabilityDoctorMalformed { message } => {
Err(LeanWorkerError::CapabilityDoctorMalformed { message })
}
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::JsonCommand { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
pub(crate) fn worker_json_command(
&mut self,
export: &str,
request_json: String,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<String, LeanWorkerError> {
const OPERATION: &str = "worker_json_command";
check_cancelled(OPERATION, cancellation)?;
self.prepare_request(false)?;
self.send_request(Request::JsonCommand {
export: export.to_owned(),
request_json,
})?;
self.record_request(false);
match self.read_response_with_progress(OPERATION, progress, cancellation)? {
Response::JsonCommand { response_json } => Ok(response_json),
other @ (Response::HealthOk
| Response::CapabilityLoaded
| Response::U64 { .. }
| Response::HostSessionOpened
| Response::Elaboration { .. }
| Response::KernelCheck { .. }
| Response::Strings { .. }
| Response::StreamComplete { .. }
| Response::StreamExportFailed { .. }
| Response::StreamCallbackFailed { .. }
| Response::StreamRowMalformed { .. }
| Response::CapabilityMetadata { .. }
| Response::CapabilityDoctor { .. }
| Response::CapabilityMetadataMalformed { .. }
| Response::CapabilityDoctorMalformed { .. }
| Response::RowsComplete { .. }
| Response::Terminating
| Response::Error { .. }) => Err(unexpected_response(OPERATION, &other)),
}
}
fn send_request(&mut self, request: Request) -> Result<(), LeanWorkerError> {
self.ensure_running()?;
let Some(stdin) = self.stdin.as_mut() else {
return Err(self.dead_error());
};
write_frame(stdin, Message::Request(request)).map_err(|err| LeanWorkerError::Protocol {
message: err.to_string(),
})
}
fn prepare_request(&mut self, import_like: bool) -> Result<(), LeanWorkerError> {
self.ensure_running()?;
if let Some(limit) = self.config.restart_policy.max_requests
&& self.requests_since_restart >= limit
{
return self.restart_with_reason(LeanWorkerRestartReason::MaxRequests { limit });
}
if import_like
&& let Some(limit) = self.config.restart_policy.max_imports
&& self.imports_since_restart >= limit
{
return self.restart_with_reason(LeanWorkerRestartReason::MaxImports { limit });
}
if let Some(limit_kib) = self.config.restart_policy.max_rss_kib {
match self.child_rss_kib() {
Some(current_kib) if current_kib >= limit_kib => {
self.stats.last_rss_kib = Some(current_kib);
return self.restart_with_reason(LeanWorkerRestartReason::RssCeiling { current_kib, limit_kib });
}
Some(current_kib) => {
self.stats.last_rss_kib = Some(current_kib);
}
None => {
self.stats.rss_samples_unavailable = self.stats.rss_samples_unavailable.saturating_add(1);
}
}
}
if let Some(limit) = self.config.restart_policy.idle_restart_after {
let idle_for = self.last_activity.elapsed();
if idle_for >= limit {
return self.restart_with_reason(LeanWorkerRestartReason::Idle { idle_for, limit });
}
}
Ok(())
}
fn record_request(&mut self, import_like: bool) {
self.stats.requests = self.stats.requests.saturating_add(1);
self.requests_since_restart = self.requests_since_restart.saturating_add(1);
if import_like {
self.stats.imports = self.stats.imports.saturating_add(1);
self.imports_since_restart = self.imports_since_restart.saturating_add(1);
}
self.last_activity = Instant::now();
}
fn restart_with_reason(&mut self, reason: LeanWorkerRestartReason) -> Result<(), LeanWorkerError> {
let config = self.config.clone();
self.stop_existing_child()?;
self.stats.record_restart(reason);
self.requests_since_restart = 0;
self.imports_since_restart = 0;
let mut next = Self::spawn(&config)?;
next.stats = self.stats.clone();
next.last_activity = Instant::now();
*self = next;
Ok(())
}
fn read_response(&mut self, operation: &'static str) -> Result<Response, LeanWorkerError> {
self.read_response_with_events(operation, None, None, None, None)
}
fn read_response_with_progress(
&mut self,
operation: &'static str,
progress: Option<&dyn LeanWorkerProgressSink>,
cancellation: Option<&LeanWorkerCancellationToken>,
) -> Result<Response, LeanWorkerError> {
self.read_response_with_events(operation, progress, cancellation, None, None)
}
fn read_response_with_events(
&mut self,
operation: &'static str,
progress: Option<&dyn LeanWorkerProgressSink>,
cancellation: Option<&LeanWorkerCancellationToken>,
data: Option<LeanWorkerDataSinkTarget<'_>>,
diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
) -> Result<Response, LeanWorkerError> {
let started = Instant::now();
let timeout = self.config.request_timeout;
let deadline = started.checked_add(timeout);
let streaming = data.is_some();
let mut request_backpressure_waits = 0_u64;
let stdout = self.stdout.take().ok_or_else(|| self.dead_error())?;
let (sender, receiver) = mpsc::sync_channel(WORKER_EVENT_BUFFER_CAPACITY);
let _reader = thread::spawn(move || read_request_messages(stdout, sender));
loop {
let event = match deadline.and_then(|deadline| deadline.checked_duration_since(Instant::now())) {
Some(remaining) if remaining.is_zero() => {
if streaming {
self.record_stream_failure(started, request_backpressure_waits);
}
self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
operation,
duration: timeout,
})?;
return Err(LeanWorkerError::Timeout {
operation,
duration: timeout,
});
}
Some(remaining) => match receiver.recv_timeout(remaining) {
Ok(event) => event,
Err(mpsc::RecvTimeoutError::Timeout) => {
if streaming {
self.record_stream_failure(started, request_backpressure_waits);
}
self.restart_with_reason(LeanWorkerRestartReason::RequestTimeout {
operation,
duration: timeout,
})?;
return Err(LeanWorkerError::Timeout {
operation,
duration: timeout,
});
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
return Err(LeanWorkerError::Protocol {
message: "worker response reader exited without a terminal response".to_owned(),
});
}
},
None => match receiver.recv() {
Ok(event) => event,
Err(_err) => {
return Err(LeanWorkerError::Protocol {
message: "worker response reader exited without a terminal response".to_owned(),
});
}
},
};
request_backpressure_waits = request_backpressure_waits.saturating_add(event.backpressure_waits());
self.stats.backpressure_waits = self.stats.backpressure_waits.saturating_add(event.backpressure_waits());
let message = match event {
RequestReaderEvent::Message { message, .. } => message,
RequestReaderEvent::Terminal { message, stdout, .. } => {
self.stdout = Some(stdout);
match message {
Message::Response(Response::Error { code, message }) => {
if streaming {
self.record_stream_failure(started, request_backpressure_waits);
}
return Err(LeanWorkerError::Worker { code, message });
}
Message::Response(response) => {
if streaming {
if matches!(response, Response::StreamComplete { .. }) {
self.record_stream_success(started);
} else {
self.record_stream_failure(started, request_backpressure_waits);
}
}
return Ok(response);
}
other @ (Message::Handshake { .. }
| Message::Request(_)
| Message::Diagnostic(_)
| Message::ProgressTick(_)
| Message::DataRow(_)
| Message::FatalExit(_)) => {
return Err(LeanWorkerError::Protocol {
message: format!("worker sent unexpected {operation} message: {other:?}"),
});
}
}
}
RequestReaderEvent::ReadError { message, eof, .. } => {
if streaming {
self.record_stream_failure(started, request_backpressure_waits);
}
return if eof {
Err(self.record_exit_error())
} else {
Err(LeanWorkerError::Protocol { message })
};
}
};
match message {
Message::ProgressTick(tick) => {
if let Err(err) =
report_parent_progress(progress, elapsed_event(tick.phase, tick.current, tick.total, started))
{
if streaming {
self.record_stream_failure(started, request_backpressure_waits);
}
return Err(err);
}
if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
if streaming {
self.record_stream_failure(started, request_backpressure_waits);
}
self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
return Err(LeanWorkerError::Cancelled { operation });
}
}
Message::DataRow(row) => {
let payload_bytes = row.payload.get().len() as u64;
if let Err(err) = report_parent_data_row(data, row) {
if streaming {
self.record_stream_failure(started, request_backpressure_waits);
}
return Err(err);
}
self.stats.data_rows_delivered = self.stats.data_rows_delivered.saturating_add(1);
self.stats.data_row_payload_bytes = self.stats.data_row_payload_bytes.saturating_add(payload_bytes);
if cancellation.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
if streaming {
self.record_stream_failure(started, request_backpressure_waits);
}
self.restart_with_reason(LeanWorkerRestartReason::Cancelled { operation })?;
return Err(LeanWorkerError::Cancelled { operation });
}
}
Message::Diagnostic(diagnostic) => {
if let Err(err) = report_parent_diagnostic(diagnostics, diagnostic.into()) {
if streaming {
self.record_stream_failure(started, request_backpressure_waits);
}
return Err(err);
}
}
Message::Response(response) => return Err(unexpected_response(operation, &response)),
other @ (Message::Handshake { .. } | Message::Request(_) | Message::FatalExit(_)) => {
return Err(LeanWorkerError::Protocol {
message: format!("worker sent unexpected {operation} message: {other:?}"),
});
}
}
}
}
fn ensure_running(&mut self) -> Result<(), LeanWorkerError> {
match self.status()? {
LeanWorkerStatus::Running => Ok(()),
LeanWorkerStatus::Exited(exit) if exit.success => Err(LeanWorkerError::ChildExited { exit }),
LeanWorkerStatus::Exited(exit) => Err(LeanWorkerError::ChildPanicOrAbort { exit }),
}
}
fn record_stream_success(&mut self, started: Instant) {
self.stats.stream_successes = self.stats.stream_successes.saturating_add(1);
self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
}
fn record_stream_failure(&mut self, started: Instant, backpressure_waits: u64) {
self.stats.stream_failures = self.stats.stream_failures.saturating_add(1);
self.stats.stream_elapsed = self.stats.stream_elapsed.saturating_add(started.elapsed());
if backpressure_waits > 0 {
self.stats.backpressure_failures = self.stats.backpressure_failures.saturating_add(1);
}
}
fn wait_for_exit(&mut self) -> Result<LeanWorkerExit, LeanWorkerError> {
let Some(child) = self.child.as_mut() else {
return Err(self.dead_error());
};
let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
let diagnostics = self.read_stderr();
let exit = LeanWorkerExit::from_status(status, diagnostics);
self.last_exit = Some(exit.clone());
self.child = None;
self.stdin = None;
self.stdout = None;
self.stats.exits = self.stats.exits.saturating_add(1);
Ok(exit)
}
fn try_record_exit(&mut self) -> Option<LeanWorkerExit> {
let child = self.child.as_mut()?;
let status = child.try_wait().ok().flatten()?;
let diagnostics = self.read_stderr();
let exit = LeanWorkerExit::from_status(status, diagnostics);
self.last_exit = Some(exit.clone());
self.child = None;
self.stdin = None;
self.stdout = None;
self.stats.exits = self.stats.exits.saturating_add(1);
Some(exit)
}
fn record_exit_error(&mut self) -> LeanWorkerError {
match self.wait_for_exit() {
Ok(exit) if exit.success => LeanWorkerError::ChildExited { exit },
Ok(exit) => LeanWorkerError::ChildPanicOrAbort { exit },
Err(err) => err,
}
}
fn stop_existing_child(&mut self) -> Result<(), LeanWorkerError> {
if let Some(child) = self.child.as_mut() {
drop(child.kill());
let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
let diagnostics = self.read_stderr();
self.last_exit = Some(LeanWorkerExit::from_status(status, diagnostics));
self.stats.exits = self.stats.exits.saturating_add(1);
}
self.child = None;
self.stdin = None;
self.stdout = None;
Ok(())
}
fn dead_error(&self) -> LeanWorkerError {
let exit = self.last_exit.clone().unwrap_or_else(|| LeanWorkerExit {
success: false,
code: None,
status: "worker is not running".to_owned(),
diagnostics: String::new(),
});
if exit.success {
LeanWorkerError::ChildExited { exit }
} else {
LeanWorkerError::ChildPanicOrAbort { exit }
}
}
fn read_stderr(&mut self) -> String {
let mut diagnostics = String::new();
if let Some(mut pipe) = self.stderr.take() {
drop(pipe.read_to_string(&mut diagnostics));
}
diagnostics
}
fn child_rss_kib(&mut self) -> Option<u64> {
let child = self.child.as_mut()?;
child_rss_kib(child.id())
}
}
enum RequestReaderEvent {
Message {
message: Message,
backpressure_waits: u64,
},
Terminal {
message: Message,
stdout: BufReader<ChildStdout>,
backpressure_waits: u64,
},
ReadError {
message: String,
eof: bool,
backpressure_waits: u64,
},
}
impl RequestReaderEvent {
fn backpressure_waits(&self) -> u64 {
match self {
Self::Message { backpressure_waits, .. }
| Self::Terminal { backpressure_waits, .. }
| Self::ReadError { backpressure_waits, .. } => *backpressure_waits,
}
}
fn add_backpressure_wait(&mut self) {
match self {
Self::Message { backpressure_waits, .. }
| Self::Terminal { backpressure_waits, .. }
| Self::ReadError { backpressure_waits, .. } => {
*backpressure_waits = backpressure_waits.saturating_add(1);
}
}
}
}
#[allow(
clippy::needless_pass_by_value,
reason = "the request reader thread must own the sender"
)]
fn read_request_messages(mut stdout: BufReader<ChildStdout>, sender: mpsc::SyncSender<RequestReaderEvent>) {
loop {
match read_frame(&mut stdout) {
Ok(frame) if matches!(frame.message, Message::Response(_)) => {
let _ = send_reader_event(
&sender,
RequestReaderEvent::Terminal {
message: frame.message,
stdout,
backpressure_waits: 0,
},
);
return;
}
Ok(frame) => {
if send_reader_event(
&sender,
RequestReaderEvent::Message {
message: frame.message,
backpressure_waits: 0,
},
)
.is_err()
{
return;
}
}
Err(err) => {
let _ = send_reader_event(
&sender,
RequestReaderEvent::ReadError {
message: err.to_string(),
eof: err.is_eof(),
backpressure_waits: 0,
},
);
return;
}
}
}
}
fn send_reader_event(sender: &mpsc::SyncSender<RequestReaderEvent>, event: RequestReaderEvent) -> Result<(), ()> {
match sender.try_send(event) {
Ok(()) => Ok(()),
Err(mpsc::TrySendError::Full(mut event)) => {
event.add_backpressure_wait();
sender.send(event).map_err(|_| ())
}
Err(mpsc::TrySendError::Disconnected(_event)) => Err(()),
}
}
impl Drop for LeanWorker {
fn drop(&mut self) {
if let Some(child) = self.child.as_mut() {
drop(child.kill());
drop(child.wait());
}
}
}
fn expect_handshake(stdout: &mut BufReader<ChildStdout>) -> Result<LeanWorkerRuntimeMetadata, LeanWorkerError> {
let frame = read_frame(stdout).map_err(|err| {
if err.is_eof() {
LeanWorkerError::Handshake {
message: "child closed stdout before handshake".to_owned(),
}
} else {
LeanWorkerError::Handshake {
message: err.to_string(),
}
}
})?;
match frame.message {
Message::Handshake {
worker_version,
protocol_version,
} if protocol_version == crate::protocol::PROTOCOL_VERSION => Ok(LeanWorkerRuntimeMetadata {
worker_version,
protocol_version,
lean_version: None,
}),
other @ (Message::Handshake { .. }
| Message::Request(_)
| Message::Response(_)
| Message::Diagnostic(_)
| Message::ProgressTick(_)
| Message::DataRow(_)
| Message::FatalExit(_)) => Err(LeanWorkerError::Handshake {
message: format!("unexpected handshake frame: {other:?}"),
}),
}
}
fn wait_with_stderr(child: &mut Child, stderr: Option<ChildStderr>) -> Result<LeanWorkerExit, LeanWorkerError> {
let status = child.wait().map_err(|source| LeanWorkerError::Wait { source })?;
let mut diagnostics = String::new();
if let Some(mut pipe) = stderr {
drop(pipe.read_to_string(&mut diagnostics));
}
Ok(LeanWorkerExit::from_status(status, diagnostics))
}
fn unexpected_response(operation: &'static str, response: &Response) -> LeanWorkerError {
LeanWorkerError::Protocol {
message: format!("worker sent unexpected {operation} response: {response:?}"),
}
}
fn path_string(path: &Path) -> String {
path.to_string_lossy().into_owned()
}
#[cfg(target_os = "linux")]
fn child_rss_kib(pid: u32) -> Option<u64> {
let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
status.lines().find_map(|line| {
let rest = line.strip_prefix("VmRSS:")?;
rest.split_whitespace().next()?.parse::<u64>().ok()
})
}
#[cfg(not(target_os = "linux"))]
fn child_rss_kib(pid: u32) -> Option<u64> {
let output = Command::new("ps")
.args(["-o", "rss=", "-p", &pid.to_string()])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let text = String::from_utf8_lossy(&output.stdout);
text.trim().parse::<u64>().ok().filter(|value| *value > 0)
}