use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use lean_rs::error::host_internal;
use lean_rs::module::LeanIo;
use lean_rs::{
LeanCallbackFlow, LeanCallbackHandle, LeanCallbackStatus, LeanError, LeanResult, LeanRuntime, LeanStringEvent,
};
use lean_rs_host::host::process::{
CommandInfoNode, NameRefNode, ProcessFileOutcome, ProcessModuleOutcome, ProcessedFile, TacticInfoNode, TermInfoNode,
};
use lean_rs_host::meta::{self, LeanMetaOptions, LeanMetaResponse, LeanMetaTransparency};
use lean_rs_host::{
LeanCapabilities, LeanDeclarationFilter, LeanElabFailure, LeanElabOptions, LeanHost, LeanKernelOutcome,
LeanSession, LeanSeverity, LeanSourceRange,
};
use serde::Deserialize;
use serde_json::value::RawValue;
use crate::protocol::{
DataRowEmitter, Diagnostic, Message, ProgressTick, ProtocolError, Request, Response, StreamSummary, read_frame,
write_frame,
};
use crate::types::{
LeanWorkerCapabilityMetadata, LeanWorkerCommandInfo, LeanWorkerDeclarationFilter, LeanWorkerDeclarationRow,
LeanWorkerDiagnostic, LeanWorkerDoctorReport, LeanWorkerElabFailure, LeanWorkerElabOptions, LeanWorkerElabResult,
LeanWorkerKernelResult, LeanWorkerKernelStatus, LeanWorkerKernelSummary, LeanWorkerMetaResult,
LeanWorkerMetaTransparency, LeanWorkerNameRef, LeanWorkerProcessFileOutcome, LeanWorkerProcessModuleOutcome,
LeanWorkerProcessedFile, LeanWorkerRendered, LeanWorkerRendering, LeanWorkerSourceRange, LeanWorkerTacticInfo,
LeanWorkerTermInfo,
};
#[derive(Clone)]
struct ProtocolWriter {
stdout: Arc<Mutex<std::io::Stdout>>,
}
impl ProtocolWriter {
fn new() -> Self {
Self {
stdout: Arc::new(Mutex::new(std::io::stdout())),
}
}
fn write(&self, message: Message) -> Result<(), ProtocolError> {
let mut stdout = self
.stdout
.lock()
.map_err(|_| ProtocolError::Io(std::io::Error::other("worker stdout mutex was poisoned")))?;
write_frame(&mut *stdout, message)
}
}
pub(crate) fn run_stdio() -> ExitCode {
install_immediate_abort_exit();
match serve_stdio() {
Ok(()) => ExitCode::SUCCESS,
Err(err) => {
eprintln!("lean-rs-worker-child: {err}");
ExitCode::FAILURE
}
}
}
#[cfg(unix)]
#[allow(
unsafe_code,
reason = "installing a signal handler and calling setrlimit/prctl require libc FFI"
)]
fn install_immediate_abort_exit() {
extern "C" fn on_sigabrt(_sig: libc::c_int) {
const MARKER: &[u8] = b"lean-rs-worker child: SIGABRT, exiting immediately\n";
unsafe {
let _ = libc::write(libc::STDERR_FILENO, MARKER.as_ptr().cast(), MARKER.len());
libc::_exit(134);
}
}
unsafe {
let mut action: libc::sigaction = std::mem::zeroed();
action.sa_sigaction = on_sigabrt as *const () as libc::sighandler_t;
libc::sigemptyset(&raw mut action.sa_mask);
action.sa_flags = libc::SA_RESETHAND;
let _ = libc::sigaction(libc::SIGABRT, &raw const action, std::ptr::null_mut());
}
unsafe {
let limit = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
let _ = libc::setrlimit(libc::RLIMIT_CORE, &raw const limit);
#[cfg(target_os = "linux")]
{
let zero: libc::c_ulong = 0;
let _ = libc::prctl(libc::PR_SET_DUMPABLE, zero, zero, zero, zero);
}
}
}
#[cfg(not(unix))]
fn install_immediate_abort_exit() {}
#[allow(
clippy::significant_drop_tightening,
reason = "the child owns stdin/stdout for the full protocol loop"
)]
fn serve_stdio() -> Result<(), Box<dyn std::error::Error>> {
let runtime = LeanRuntime::init()?;
let stdin = std::io::stdin();
let mut reader = stdin.lock();
let writer = ProtocolWriter::new();
let mut host_session: Option<HostSessionState> = None;
writer.write(Message::Handshake {
worker_version: env!("CARGO_PKG_VERSION").to_owned(),
protocol_version: crate::protocol::PROTOCOL_VERSION,
})?;
loop {
let frame = read_frame(&mut reader)?;
let Message::Request(request) = frame.message else {
writer.write(Message::Response(Response::Error {
code: "lean_rs.worker.protocol.unexpected_frame".to_owned(),
message: "child expected request frame".to_owned(),
}))?;
continue;
};
match request {
Request::Health => {
writer.write(Message::Response(Response::HealthOk))?;
}
Request::LoadFixtureCapability { fixture_root } => {
let response = match load_fixture_capability(runtime, Path::new(&fixture_root)) {
Ok(()) => Response::CapabilityLoaded,
Err(err) => error_response(&err),
};
writer.write(Message::Response(response))?;
}
Request::CallFixtureMul { fixture_root, lhs, rhs } => {
let response = match call_fixture_mul(runtime, Path::new(&fixture_root), lhs, rhs) {
Ok(value) => Response::U64 { value },
Err(err) => error_response(&err),
};
writer.write(Message::Response(response))?;
}
Request::TriggerLeanPanic { fixture_root } => {
let response = match trigger_lean_panic(runtime, Path::new(&fixture_root)) {
Ok(()) => Response::Error {
code: "lean_rs.worker.panic_fixture_returned".to_owned(),
message: "Lean panic fixture returned instead of terminating the child".to_owned(),
},
Err(err) => error_response(&err),
};
writer.write(Message::Response(response))?;
}
Request::OpenHostSession {
project_root,
package,
lib_name,
imports,
} => {
let response = match HostSessionState::open(runtime, &project_root, &package, &lib_name, &imports) {
Ok(state) => {
host_session = Some(state);
Response::HostSessionOpened
}
Err(err) => error_response(&err),
};
writer.write(Message::Response(response))?;
}
Request::Elaborate { source, options } => {
let response = match host_session.as_mut() {
Some(state) => match state.elaborate(&source, &options) {
Ok(outcome) => Response::Elaboration { outcome },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::KernelCheck {
source,
options,
progress,
} => {
let response = match host_session.as_mut() {
Some(state) => match state.kernel_check(&source, &options, progress, &writer) {
Ok(outcome) => Response::KernelCheck { outcome },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::DeclarationKinds { names, progress } => {
let response = match host_session.as_mut() {
Some(state) => match state.declaration_kinds(&names, progress, &writer) {
Ok(values) => Response::Strings { values },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::DeclarationNames { names, progress } => {
let response = match host_session.as_mut() {
Some(state) => match state.declaration_names(&names, progress, &writer) {
Ok(values) => Response::Strings { values },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::RunDataStream {
export,
request_json,
progress,
} => {
let response = match host_session.as_mut() {
Some(state) => match state.run_data_stream(&export, &request_json, progress, &writer) {
Ok(summary) => Response::StreamComplete { summary },
Err(StreamRunError::Host(err)) => error_response(&err),
Err(StreamRunError::ExportStatus(status)) => {
Response::StreamExportFailed { status_byte: status }
}
Err(StreamRunError::CallbackStatus(status)) => Response::StreamCallbackFailed {
status_byte: status.as_abi(),
description: status.description().to_owned(),
},
Err(StreamRunError::MalformedRow(message)) => Response::StreamRowMalformed { message },
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::CapabilityMetadata { export, request_json } => {
let response = match host_session.as_mut() {
Some(state) => match state.capability_metadata(&export, &request_json) {
Ok(metadata) => Response::CapabilityMetadata { metadata },
Err(CapabilityJsonError::Host(err)) => error_response(&err),
Err(CapabilityJsonError::Malformed(message)) => {
Response::CapabilityMetadataMalformed { message }
}
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::CapabilityDoctor { export, request_json } => {
let response = match host_session.as_mut() {
Some(state) => match state.capability_doctor(&export, &request_json) {
Ok(report) => Response::CapabilityDoctor { report },
Err(CapabilityJsonError::Host(err)) => error_response(&err),
Err(CapabilityJsonError::Malformed(message)) => Response::CapabilityDoctorMalformed { message },
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::JsonCommand { export, request_json } => {
let response = match host_session.as_mut() {
Some(state) => match state.json_command(&export, &request_json) {
Ok(response_json) => Response::JsonCommand { response_json },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::InferType { source, options } => {
let response = match host_session.as_mut() {
Some(state) => match state.infer_type(&source, &options) {
Ok(result) => Response::MetaExpr { result },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::Whnf { source, options } => {
let response = match host_session.as_mut() {
Some(state) => match state.whnf(&source, &options) {
Ok(result) => Response::MetaExpr { result },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::IsDefEq {
lhs,
rhs,
transparency,
options,
} => {
let response = match host_session.as_mut() {
Some(state) => match state.is_def_eq(&lhs, &rhs, transparency, &options) {
Ok(result) => Response::MetaBool { result },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::Describe { name } => {
let response = match host_session.as_mut() {
Some(state) => match state.describe(&name) {
Ok(row) => Response::Declaration { row },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::ListDeclarationsStrings { filter, progress } => {
let response = match host_session.as_mut() {
Some(state) => match state.list_declarations_strings(filter, progress, &writer) {
Ok(count) => Response::RowsComplete { count },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::DescribeBulk { names, progress } => {
let response = match host_session.as_mut() {
Some(state) => match state.describe_bulk(&names, progress, &writer) {
Ok(rows) => Response::DeclarationBulk { rows },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::ProcessFile { source, options } => {
let response = match host_session.as_mut() {
Some(state) => match state.process_file(&source, &options) {
Ok(outcome) => Response::ProcessFile { outcome },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::ProcessModule { source, options } => {
let response = match host_session.as_mut() {
Some(state) => match state.process_module(&source, &options) {
Ok(outcome) => Response::ProcessModule { outcome },
Err(err) => error_response(&err),
},
None => missing_session_response(),
};
writer.write(Message::Response(response))?;
}
Request::EmitTestRows { streams } => {
let count = emit_test_rows(&writer, &streams)?;
writer.write(Message::Response(Response::RowsComplete { count }))?;
}
Request::EmitTestRowsThenExit => {
let _count = emit_test_rows(&writer, &["rows".to_owned()])?;
return Ok(());
}
Request::EmitTestRowsThenPanic => {
let _count = emit_test_rows(&writer, &["rows".to_owned()])?;
std::process::abort();
}
Request::Terminate => {
writer.write(Message::Response(Response::Terminating))?;
return Ok(());
}
}
}
}
fn load_fixture_capability(runtime: &'static LeanRuntime, fixture_root: &Path) -> LeanResult<()> {
let host = LeanHost::from_lake_project(runtime, fixture_root)?;
let _caps = host.load_capabilities("lean_rs_fixture", "LeanRsFixture")?;
Ok(())
}
fn call_fixture_mul(runtime: &'static LeanRuntime, fixture_root: &Path, lhs: u64, rhs: u64) -> LeanResult<u64> {
let host = LeanHost::from_lake_project(runtime, fixture_root)?;
let caps = host.load_capabilities("lean_rs_fixture", "LeanRsFixture")?;
let mut session = caps.session(&["LeanRsFixture.Scalars"], None, None)?;
session.call_capability::<(u64, u64), u64>("lean_rs_fixture_u64_mul", (lhs, rhs), None)
}
fn trigger_lean_panic(runtime: &'static LeanRuntime, fixture_root: &Path) -> LeanResult<()> {
let host = LeanHost::from_lake_project(runtime, fixture_root)?;
let caps = host.load_capabilities("lean_rs_fixture", "LeanRsFixture")?;
let mut session = caps.session(&["LeanRsFixture.Effects"], None, None)?;
session.call_capability::<(u8,), ()>("lean_rs_fixture_panic_unit", (0,), None)
}
fn error_response(err: &LeanError) -> Response {
Response::Error {
code: err.code().as_str().to_owned(),
message: err.to_string(),
}
}
fn missing_session_response() -> Response {
Response::Error {
code: "lean_rs.worker.session_missing".to_owned(),
message: "open a LeanWorkerSession before sending host-session requests".to_owned(),
}
}
struct HostSessionState {
#[allow(dead_code, reason = "leaked host anchors the capability and session lifetimes")]
host: &'static LeanHost<'static>,
#[allow(dead_code, reason = "leaked capabilities anchor the session borrow")]
capabilities: &'static LeanCapabilities<'static, 'static>,
session: LeanSession<'static, 'static>,
}
impl HostSessionState {
fn open(
runtime: &'static LeanRuntime,
project_root: &str,
package: &str,
lib_name: &str,
imports: &[String],
) -> LeanResult<Self> {
let host = Box::leak(Box::new(LeanHost::from_lake_project(runtime, Path::new(project_root))?));
let capabilities = Box::leak(Box::new(host.load_capabilities(package, lib_name)?));
let import_refs: Vec<&str> = imports.iter().map(String::as_str).collect();
let session = capabilities.session(&import_refs, None, None)?;
Ok(Self {
host,
capabilities,
session,
})
}
fn elaborate(&mut self, source: &str, options: &LeanWorkerElabOptions) -> LeanResult<LeanWorkerElabResult> {
let options = options.to_host_options();
let outcome = self.session.elaborate(source, None, &options, None)?;
Ok(match outcome {
Ok(_expr) => LeanWorkerElabResult {
success: true,
diagnostics: Vec::new(),
truncated: false,
},
Err(failure) => elab_failure_outcome(&failure),
})
}
fn kernel_check(
&mut self,
source: &str,
options: &LeanWorkerElabOptions,
progress: bool,
writer: &ProtocolWriter,
) -> LeanResult<LeanWorkerKernelResult> {
if progress {
emit_progress(writer, "kernel_check", 0, Some(1));
}
let options = options.to_host_options();
let outcome = self.session.kernel_check(source, &options, None, None)?;
if progress {
emit_progress(writer, "kernel_check", 1, Some(1));
}
Ok(match outcome {
LeanKernelOutcome::Checked(evidence) => {
let summary = self.session.summarize_evidence(&evidence, None)?;
LeanWorkerKernelResult {
status: LeanWorkerKernelStatus::Checked,
diagnostics: Vec::new(),
truncated: false,
summary: Some(LeanWorkerKernelSummary {
declaration_name: summary.declaration_name().to_owned(),
kind: summary.kind().to_owned(),
type_signature: summary.type_signature().to_owned(),
}),
}
}
LeanKernelOutcome::Rejected(failure) => kernel_failure_outcome(LeanWorkerKernelStatus::Rejected, &failure),
LeanKernelOutcome::Unavailable(failure) => {
kernel_failure_outcome(LeanWorkerKernelStatus::Unavailable, &failure)
}
LeanKernelOutcome::Unsupported(failure) => {
kernel_failure_outcome(LeanWorkerKernelStatus::Unsupported, &failure)
}
})
}
fn declaration_kinds(
&mut self,
names: &[String],
progress: bool,
writer: &ProtocolWriter,
) -> LeanResult<Vec<String>> {
if progress {
let total = Some(u64::try_from(names.len()).unwrap_or(u64::MAX));
let mut out = Vec::with_capacity(names.len());
for (idx, name) in names.iter().enumerate() {
out.push(self.session.declaration_kind(name, None)?);
emit_progress(
writer,
"declaration_kind_bulk",
u64::try_from(idx.saturating_add(1)).unwrap_or(u64::MAX),
total,
);
}
Ok(out)
} else {
let refs: Vec<&str> = names.iter().map(String::as_str).collect();
self.session.declaration_kind_bulk(&refs, None, None)
}
}
fn declaration_names(
&mut self,
names: &[String],
progress: bool,
writer: &ProtocolWriter,
) -> LeanResult<Vec<String>> {
if progress {
let total = Some(u64::try_from(names.len()).unwrap_or(u64::MAX));
let mut out = Vec::with_capacity(names.len());
for (idx, name) in names.iter().enumerate() {
out.push(self.session.declaration_name(name, None)?);
emit_progress(
writer,
"declaration_name_bulk",
u64::try_from(idx.saturating_add(1)).unwrap_or(u64::MAX),
total,
);
}
Ok(out)
} else {
let refs: Vec<&str> = names.iter().map(String::as_str).collect();
self.session.declaration_name_bulk(&refs, None, None)
}
}
fn run_data_stream(
&mut self,
export: &str,
request_json: &str,
progress: bool,
writer: &ProtocolWriter,
) -> Result<StreamSummary, StreamRunError> {
if progress {
emit_progress(writer, "data_stream", 0, None);
}
let started = Instant::now();
let forwarder = Arc::new(Mutex::new(StreamForwarder::new(writer.clone(), progress)));
let row_error = Arc::new(Mutex::new(None::<StreamCallbackError>));
let callback_forwarder = Arc::clone(&forwarder);
let callback_error = Arc::clone(&row_error);
let callback = LeanCallbackHandle::<LeanStringEvent>::register(move |event| {
if callback_error.lock().map_or(true, |guard| guard.is_some()) {
return LeanCallbackFlow::Stop;
}
match parse_row_envelope(&event.value) {
Ok(StreamCallbackEvent::Row(row)) => match callback_forwarder.lock() {
Ok(mut guard) => match guard.emit_row(row) {
Ok(()) => LeanCallbackFlow::Continue,
Err(err) => {
if let Ok(mut guard) = callback_error.lock() {
*guard = Some(StreamCallbackError::Write(err.to_string()));
}
LeanCallbackFlow::Stop
}
},
Err(_) => {
if let Ok(mut guard) = callback_error.lock() {
*guard = Some(StreamCallbackError::Malformed(
"stream forwarder mutex was poisoned".to_owned(),
));
}
LeanCallbackFlow::Stop
}
},
Ok(StreamCallbackEvent::Diagnostic(diagnostic)) => match callback_forwarder.lock() {
Ok(guard) => match guard.emit_diagnostic(diagnostic) {
Ok(()) => LeanCallbackFlow::Continue,
Err(err) => {
if let Ok(mut guard) = callback_error.lock() {
*guard = Some(StreamCallbackError::Write(err.to_string()));
}
LeanCallbackFlow::Stop
}
},
Err(_) => {
if let Ok(mut guard) = callback_error.lock() {
*guard = Some(StreamCallbackError::Malformed(
"stream forwarder mutex was poisoned".to_owned(),
));
}
LeanCallbackFlow::Stop
}
},
Ok(StreamCallbackEvent::Progress(progress)) => match callback_forwarder.lock() {
Ok(guard) => match guard.emit_progress(progress) {
Ok(()) => LeanCallbackFlow::Continue,
Err(err) => {
if let Ok(mut guard) = callback_error.lock() {
*guard = Some(StreamCallbackError::Write(err.to_string()));
}
LeanCallbackFlow::Stop
}
},
Err(_) => {
if let Ok(mut guard) = callback_error.lock() {
*guard = Some(StreamCallbackError::Malformed(
"stream forwarder mutex was poisoned".to_owned(),
));
}
LeanCallbackFlow::Stop
}
},
Ok(StreamCallbackEvent::Metadata(metadata)) => match callback_forwarder.lock() {
Ok(mut guard) => {
guard.set_metadata(metadata);
LeanCallbackFlow::Continue
}
Err(_) => {
if let Ok(mut guard) = callback_error.lock() {
*guard = Some(StreamCallbackError::Malformed(
"stream forwarder mutex was poisoned".to_owned(),
));
}
LeanCallbackFlow::Stop
}
},
Err(message) => {
if let Ok(mut guard) = callback_error.lock() {
*guard = Some(StreamCallbackError::Malformed(message));
}
LeanCallbackFlow::Stop
}
}
})
.map_err(StreamRunError::Host)?;
let (handle, trampoline) = callback.abi_parts();
let status = self
.session
.call_capability::<(&str, usize, usize), LeanIo<u8>>(export, (request_json, handle, trampoline), None)
.map_err(StreamRunError::Host)?;
if let Some(error) = row_error.lock().ok().and_then(|mut guard| guard.take()) {
return Err(match error {
StreamCallbackError::Malformed(message) => StreamRunError::MalformedRow(message),
StreamCallbackError::Write(message) => {
StreamRunError::Host(host_internal(format!("worker stream frame write failed: {message}")))
}
});
}
match LeanCallbackStatus::from_abi(status) {
Some(LeanCallbackStatus::Ok) => {}
Some(status) => return Err(StreamRunError::CallbackStatus(status)),
None => return Err(StreamRunError::ExportStatus(status)),
}
let guard = forwarder
.lock()
.map_err(|_| StreamRunError::MalformedRow("stream forwarder mutex was poisoned".to_owned()))?;
Ok(guard.summary(started.elapsed()))
}
fn capability_metadata(
&mut self,
export: &str,
request_json: &str,
) -> Result<LeanWorkerCapabilityMetadata, CapabilityJsonError> {
let raw = self
.session
.call_capability::<(&str,), LeanIo<String>>(export, (request_json,), None)
.map_err(CapabilityJsonError::Host)?;
serde_json::from_str(&raw).map_err(|err| CapabilityJsonError::Malformed(err.to_string()))
}
fn capability_doctor(
&mut self,
export: &str,
request_json: &str,
) -> Result<LeanWorkerDoctorReport, CapabilityJsonError> {
let raw = self
.session
.call_capability::<(&str,), LeanIo<String>>(export, (request_json,), None)
.map_err(CapabilityJsonError::Host)?;
serde_json::from_str(&raw).map_err(|err| CapabilityJsonError::Malformed(err.to_string()))
}
fn json_command(&mut self, export: &str, request_json: &str) -> LeanResult<String> {
self.session
.call_capability::<(&str,), LeanIo<String>>(export, (request_json,), None)
}
fn infer_type(
&mut self,
source: &str,
options: &LeanWorkerElabOptions,
) -> LeanResult<LeanWorkerMetaResult<LeanWorkerRendered>> {
let elab_options = options.to_host_options();
let elab_outcome = self.session.elaborate(source, None, &elab_options, None)?;
let expr = match elab_outcome {
Ok(expr) => expr,
Err(failure) => return Ok(meta_failure_from_elab(&failure)),
};
let meta_options = options.to_host_meta_options(LeanMetaTransparency::Default);
let response = self.session.run_meta(&meta::infer_type(), expr, &meta_options, None)?;
meta_render_expr(&mut self.session, response, &meta_options)
}
fn whnf(
&mut self,
source: &str,
options: &LeanWorkerElabOptions,
) -> LeanResult<LeanWorkerMetaResult<LeanWorkerRendered>> {
let elab_options = options.to_host_options();
let elab_outcome = self.session.elaborate(source, None, &elab_options, None)?;
let expr = match elab_outcome {
Ok(expr) => expr,
Err(failure) => return Ok(meta_failure_from_elab(&failure)),
};
let meta_options = options.to_host_meta_options(LeanMetaTransparency::Default);
let response = self.session.run_meta(&meta::whnf(), expr, &meta_options, None)?;
meta_render_expr(&mut self.session, response, &meta_options)
}
fn is_def_eq(
&mut self,
lhs: &str,
rhs: &str,
transparency: LeanWorkerMetaTransparency,
options: &LeanWorkerElabOptions,
) -> LeanResult<LeanWorkerMetaResult<bool>> {
let elab_options = options.to_host_options();
let lhs_outcome = self.session.elaborate(lhs, None, &elab_options, None)?;
let lhs_expr = match lhs_outcome {
Ok(expr) => expr,
Err(failure) => return Ok(meta_failure_from_elab(&failure)),
};
let rhs_outcome = self.session.elaborate(rhs, None, &elab_options, None)?;
let rhs_expr = match rhs_outcome {
Ok(expr) => expr,
Err(failure) => return Ok(meta_failure_from_elab(&failure)),
};
let transparency_host = transparency.into();
let meta_options = options.to_host_meta_options(transparency_host);
let response = self.session.run_meta(
&meta::is_def_eq(),
(lhs_expr, rhs_expr, transparency_host),
&meta_options,
None,
)?;
match response {
LeanMetaResponse::Ok(value) => Ok(LeanWorkerMetaResult::Ok { value }),
LeanMetaResponse::Failed(failure) => Ok(LeanWorkerMetaResult::Failed {
failure: elab_failure_wire(&failure),
}),
LeanMetaResponse::TimeoutOrHeartbeat(failure) => Ok(LeanWorkerMetaResult::TimeoutOrHeartbeat {
failure: elab_failure_wire(&failure),
}),
LeanMetaResponse::Unsupported(failure) => Ok(LeanWorkerMetaResult::Unsupported {
failure: elab_failure_wire(&failure),
}),
}
}
fn describe(&mut self, name: &str) -> LeanResult<Option<LeanWorkerDeclarationRow>> {
let kind = self.session.declaration_kind(name, None)?;
if kind == "missing" {
return Ok(None);
}
let type_signature = match self.session.declaration_type(name, None)? {
Some(expr) => Some(self.session.expr_to_string_raw(&expr, None)?),
None => None,
};
let source = self
.session
.declaration_source_range(name, None)?
.map(source_range_wire);
Ok(Some(LeanWorkerDeclarationRow {
name: name.to_owned(),
kind,
type_signature,
source,
}))
}
fn list_declarations_strings(
&mut self,
filter: LeanWorkerDeclarationFilter,
progress: bool,
writer: &ProtocolWriter,
) -> LeanResult<u64> {
let host_filter = LeanDeclarationFilter {
include_private: filter.include_private,
include_generated: filter.include_generated,
include_internal: filter.include_internal,
};
if progress {
emit_progress(writer, "list_declarations_strings", 0, None);
}
let names = self.session.list_declarations_strings(&host_filter, None, None)?;
let total = u64::try_from(names.len()).unwrap_or(u64::MAX);
let mut emitter = DataRowEmitter::default();
for name in names {
let payload = serde_json::value::to_raw_value(&name)
.map_err(|err| host_internal(format!("list_declarations_strings row payload encode failed: {err}")))?;
let row = emitter.next("rows", payload);
writer
.write(Message::DataRow(row))
.map_err(|err| host_internal(format!("list_declarations_strings row frame write failed: {err}")))?;
}
if progress {
emit_progress(writer, "list_declarations_strings", total, Some(total));
}
Ok(emitter.count())
}
fn describe_bulk(
&mut self,
names: &[String],
progress: bool,
writer: &ProtocolWriter,
) -> LeanResult<Vec<LeanWorkerDeclarationRow>> {
let refs: Vec<&str> = names.iter().map(String::as_str).collect();
let kinds = self.session.declaration_kind_bulk(&refs, None, None)?;
let types = self.session.declaration_type_bulk(&refs, None, None)?;
let total = Some(u64::try_from(names.len()).unwrap_or(u64::MAX));
let mut rows = Vec::with_capacity(names.len());
for (idx, name) in names.iter().enumerate() {
let kind = kinds.get(idx).cloned().unwrap_or_else(|| "missing".to_owned());
let row = if kind == "missing" {
LeanWorkerDeclarationRow {
name: name.clone(),
kind,
type_signature: None,
source: None,
}
} else {
let type_signature = match types.get(idx).and_then(Option::as_ref) {
Some(expr) => Some(self.session.expr_to_string_raw(expr, None)?),
None => None,
};
let source = self
.session
.declaration_source_range(name, None)?
.map(source_range_wire);
LeanWorkerDeclarationRow {
name: name.clone(),
kind,
type_signature,
source,
}
};
rows.push(row);
if progress {
emit_progress(
writer,
"describe_bulk",
u64::try_from(idx.saturating_add(1)).unwrap_or(u64::MAX),
total,
);
}
}
Ok(rows)
}
fn process_file(
&mut self,
source: &str,
options: &LeanWorkerElabOptions,
) -> LeanResult<LeanWorkerProcessFileOutcome> {
let options = options.to_host_options();
Ok(match self.session.process_with_info_tree(source, &options, None)? {
ProcessFileOutcome::Processed(file) => LeanWorkerProcessFileOutcome::Processed {
file: processed_file_wire(file),
},
ProcessFileOutcome::Unsupported => LeanWorkerProcessFileOutcome::Unsupported,
})
}
fn process_module(
&mut self,
source: &str,
options: &LeanWorkerElabOptions,
) -> LeanResult<LeanWorkerProcessModuleOutcome> {
let options = options.to_host_options();
Ok(
match self.session.process_module_with_info_tree(source, &options, None)? {
ProcessModuleOutcome::Ok { file, imports } => LeanWorkerProcessModuleOutcome::Ok {
file: processed_file_wire(file),
imports,
},
ProcessModuleOutcome::MissingImports { file, imports, missing } => {
LeanWorkerProcessModuleOutcome::MissingImports {
file: processed_file_wire(file),
imports,
missing,
}
}
ProcessModuleOutcome::HeaderParseFailed { diagnostics } => {
LeanWorkerProcessModuleOutcome::HeaderParseFailed {
diagnostics: elab_failure_wire(&diagnostics),
}
}
ProcessModuleOutcome::Unsupported => LeanWorkerProcessModuleOutcome::Unsupported,
},
)
}
}
#[derive(Clone, Debug)]
struct PendingDataRow {
stream: String,
payload: Box<RawValue>,
}
enum StreamCallbackEvent {
Row(PendingDataRow),
Diagnostic(Diagnostic),
Progress(ProgressTick),
Metadata(serde_json::Value),
}
enum StreamCallbackError {
Malformed(String),
Write(String),
}
struct StreamForwarder {
writer: ProtocolWriter,
emitter: DataRowEmitter,
progress: bool,
metadata: Option<serde_json::Value>,
}
impl StreamForwarder {
fn new(writer: ProtocolWriter, progress: bool) -> Self {
Self {
writer,
emitter: DataRowEmitter::default(),
progress,
metadata: None,
}
}
fn emit_row(&mut self, row: PendingDataRow) -> Result<(), ProtocolError> {
let row = self.emitter.next(row.stream, row.payload);
self.writer.write(Message::DataRow(row))?;
if self.progress {
emit_progress(&self.writer, "data_stream", self.emitter.count(), None);
}
Ok(())
}
fn emit_diagnostic(&self, diagnostic: Diagnostic) -> Result<(), ProtocolError> {
self.writer.write(Message::Diagnostic(diagnostic))
}
fn emit_progress(&self, progress: ProgressTick) -> Result<(), ProtocolError> {
self.writer.write(Message::ProgressTick(progress))
}
fn set_metadata(&mut self, metadata: serde_json::Value) {
self.metadata = Some(metadata);
}
fn summary(&self, elapsed: std::time::Duration) -> StreamSummary {
StreamSummary::new(
self.emitter.count(),
self.emitter.per_stream_counts(),
elapsed,
self.metadata.clone(),
)
}
}
#[derive(Debug)]
enum StreamRunError {
Host(LeanError),
ExportStatus(u8),
CallbackStatus(LeanCallbackStatus),
MalformedRow(String),
}
enum CapabilityJsonError {
Host(LeanError),
Malformed(String),
}
impl From<crate::protocol::ProtocolError> for StreamRunError {
fn from(value: crate::protocol::ProtocolError) -> Self {
Self::Host(host_internal(format!("worker data-row frame write failed: {value}")))
}
}
fn parse_row_envelope(raw: &str) -> Result<StreamCallbackEvent, String> {
let envelope: RowCallbackEnvelope =
serde_json::from_str(raw).map_err(|err| format!("row callback payload is not valid JSON: {err}"))?;
if let Some(diagnostic) = envelope.diagnostic {
let code = diagnostic
.code
.filter(|value| !value.is_empty())
.ok_or_else(|| "diagnostic callback payload must contain a non-empty string field `code`".to_owned())?;
let message = diagnostic
.message
.ok_or_else(|| "diagnostic callback payload must contain a string field `message`".to_owned())?;
return Ok(StreamCallbackEvent::Diagnostic(Diagnostic { code, message }));
}
if let Some(progress) = envelope.progress {
let phase = progress
.phase
.filter(|value| !value.is_empty())
.ok_or_else(|| "progress callback payload must contain a non-empty string field `phase`".to_owned())?;
return Ok(StreamCallbackEvent::Progress(ProgressTick {
phase,
current: progress.current,
total: progress.total,
}));
}
if let Some(metadata) = envelope.metadata {
let metadata = serde_json::from_str(metadata.get())
.map_err(|err| format!("metadata callback payload is not valid JSON: {err}"))?;
return Ok(StreamCallbackEvent::Metadata(metadata));
}
let stream = envelope
.stream
.filter(|value| !value.is_empty())
.ok_or_else(|| "row callback payload must contain a non-empty string field `stream`".to_owned())?;
let payload = envelope
.payload
.ok_or_else(|| "row callback payload must contain field `payload`".to_owned())?;
Ok(StreamCallbackEvent::Row(PendingDataRow { stream, payload }))
}
#[derive(Deserialize)]
struct RowCallbackEnvelope {
stream: Option<String>,
payload: Option<Box<RawValue>>,
diagnostic: Option<RowCallbackDiagnostic>,
progress: Option<RowCallbackProgress>,
metadata: Option<Box<RawValue>>,
}
#[derive(Deserialize)]
struct RowCallbackDiagnostic {
code: Option<String>,
message: Option<String>,
}
#[derive(Deserialize)]
struct RowCallbackProgress {
phase: Option<String>,
current: u64,
total: Option<u64>,
}
impl LeanWorkerElabOptions {
fn to_host_options(&self) -> LeanElabOptions {
LeanElabOptions::new()
.namespace_context(&self.namespace_context)
.file_label(&self.file_label)
.heartbeat_limit(self.heartbeat_limit)
.diagnostic_byte_limit(self.diagnostic_byte_limit)
}
fn to_host_meta_options(&self, transparency: LeanMetaTransparency) -> LeanMetaOptions {
LeanMetaOptions::new()
.namespace_context(&self.namespace_context)
.heartbeat_limit(self.heartbeat_limit)
.diagnostic_byte_limit(self.diagnostic_byte_limit)
.transparency(transparency)
}
}
impl From<LeanWorkerMetaTransparency> for LeanMetaTransparency {
fn from(value: LeanWorkerMetaTransparency) -> Self {
match value {
LeanWorkerMetaTransparency::Default => Self::Default,
LeanWorkerMetaTransparency::Reducible => Self::Reducible,
LeanWorkerMetaTransparency::Instances => Self::Instances,
LeanWorkerMetaTransparency::All => Self::All,
}
}
}
fn elab_failure_wire(failure: &LeanElabFailure) -> LeanWorkerElabFailure {
LeanWorkerElabFailure {
diagnostics: diagnostics(failure),
truncated: failure.truncated(),
}
}
fn meta_failure_from_elab<T>(failure: &LeanElabFailure) -> LeanWorkerMetaResult<T> {
LeanWorkerMetaResult::Failed {
failure: elab_failure_wire(failure),
}
}
fn meta_render_expr(
session: &mut LeanSession<'static, 'static>,
response: LeanMetaResponse<lean_rs::LeanExpr<'static>>,
meta_options: &LeanMetaOptions,
) -> LeanResult<LeanWorkerMetaResult<LeanWorkerRendered>> {
let expr = match response {
LeanMetaResponse::Ok(expr) => expr,
LeanMetaResponse::Failed(failure) => {
return Ok(LeanWorkerMetaResult::Failed {
failure: elab_failure_wire(&failure),
});
}
LeanMetaResponse::TimeoutOrHeartbeat(failure) => {
return Ok(LeanWorkerMetaResult::TimeoutOrHeartbeat {
failure: elab_failure_wire(&failure),
});
}
LeanMetaResponse::Unsupported(failure) => {
return Ok(LeanWorkerMetaResult::Unsupported {
failure: elab_failure_wire(&failure),
});
}
};
let pp_response = session.run_meta(&meta::pp_expr(), expr.clone(), meta_options, None)?;
Ok(match pp_response {
LeanMetaResponse::Ok(rendered) => LeanWorkerMetaResult::Ok {
value: LeanWorkerRendered {
value: rendered,
rendering: LeanWorkerRendering::Pretty,
},
},
LeanMetaResponse::Unsupported(_) => LeanWorkerMetaResult::Ok {
value: LeanWorkerRendered {
value: session.expr_to_string_raw(&expr, None)?,
rendering: LeanWorkerRendering::Raw,
},
},
LeanMetaResponse::Failed(failure) => LeanWorkerMetaResult::Failed {
failure: elab_failure_wire(&failure),
},
LeanMetaResponse::TimeoutOrHeartbeat(failure) => LeanWorkerMetaResult::TimeoutOrHeartbeat {
failure: elab_failure_wire(&failure),
},
})
}
fn source_range_wire(range: LeanSourceRange) -> LeanWorkerSourceRange {
LeanWorkerSourceRange {
file: range.file,
start_line: range.start_line,
start_column: range.start_column,
end_line: range.end_line,
end_column: range.end_column,
}
}
fn command_info_wire(node: CommandInfoNode) -> LeanWorkerCommandInfo {
LeanWorkerCommandInfo {
start_line: node.start_line,
start_column: node.start_column,
end_line: node.end_line,
end_column: node.end_column,
decl_name: node.decl_name,
}
}
fn term_info_wire(node: TermInfoNode) -> LeanWorkerTermInfo {
LeanWorkerTermInfo {
start_line: node.start_line,
start_column: node.start_column,
end_line: node.end_line,
end_column: node.end_column,
expr_str: node.expr_str,
type_str: node.type_str,
expected_type_str: node.expected_type_str,
}
}
fn tactic_info_wire(node: TacticInfoNode) -> LeanWorkerTacticInfo {
LeanWorkerTacticInfo {
start_line: node.start_line,
start_column: node.start_column,
end_line: node.end_line,
end_column: node.end_column,
goals_before: node.goals_before,
goals_after: node.goals_after,
}
}
fn name_ref_wire(node: NameRefNode) -> LeanWorkerNameRef {
LeanWorkerNameRef {
start_line: node.start_line,
start_column: node.start_column,
end_line: node.end_line,
end_column: node.end_column,
name: node.name,
is_binder: node.is_binder,
}
}
fn processed_file_wire(file: ProcessedFile) -> LeanWorkerProcessedFile {
LeanWorkerProcessedFile {
commands: file.commands.into_iter().map(command_info_wire).collect(),
terms: file.terms.into_iter().map(term_info_wire).collect(),
tactics: file.tactics.into_iter().map(tactic_info_wire).collect(),
names: file.names.into_iter().map(name_ref_wire).collect(),
diagnostics: elab_failure_wire(&file.diagnostics),
}
}
fn elab_failure_outcome(failure: &LeanElabFailure) -> LeanWorkerElabResult {
LeanWorkerElabResult {
success: false,
diagnostics: diagnostics(failure),
truncated: failure.truncated(),
}
}
fn kernel_failure_outcome(status: LeanWorkerKernelStatus, failure: &LeanElabFailure) -> LeanWorkerKernelResult {
LeanWorkerKernelResult {
status,
diagnostics: diagnostics(failure),
truncated: failure.truncated(),
summary: None,
}
}
fn diagnostics(failure: &LeanElabFailure) -> Vec<LeanWorkerDiagnostic> {
failure
.diagnostics()
.iter()
.map(|diagnostic| {
let (line, column, end_line, end_column) =
diagnostic.position().map_or((None, None, None, None), |position| {
(
Some(position.line()),
Some(position.column()),
position.end_line(),
position.end_column(),
)
});
LeanWorkerDiagnostic {
severity: match diagnostic.severity() {
LeanSeverity::Info => "info",
LeanSeverity::Warning => "warning",
LeanSeverity::Error => "error",
}
.to_owned(),
message: diagnostic.message().to_owned(),
file_label: diagnostic.file_label().to_owned(),
line,
column,
end_line,
end_column,
}
})
.collect()
}
fn emit_progress(writer: &ProtocolWriter, phase: &str, current: u64, total: Option<u64>) {
drop(writer.write(Message::ProgressTick(ProgressTick {
phase: phase.to_owned(),
current,
total,
})));
}
fn emit_test_rows(writer: &ProtocolWriter, streams: &[String]) -> Result<u64, crate::protocol::ProtocolError> {
let mut emitter = DataRowEmitter::default();
for (idx, stream) in streams.iter().enumerate() {
let payload = serde_json::value::to_raw_value(&serde_json::json!({
"stream": stream,
"index": idx,
}))?;
let row = emitter.next(stream.clone(), payload);
writer.write(Message::DataRow(row))?;
}
Ok(emitter.count())
}
#[allow(dead_code, reason = "reserved for future worker configuration paths")]
fn _path_for_diagnostics(path: &Path) -> PathBuf {
path.to_path_buf()
}