1use std::path::{Path, PathBuf};
2use std::process::ExitCode;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use lean_rs::error::host_internal;
7use lean_rs::module::LeanIo;
8use lean_rs::{
9 LeanCallbackFlow, LeanCallbackHandle, LeanCallbackStatus, LeanError, LeanResult, LeanRuntime, LeanStringEvent,
10};
11use lean_rs_host::host::process::{
12 CommandInfoNode, NameRefNode, ProcessFileOutcome, ProcessModuleOutcome, ProcessedFile, TacticInfoNode, TermInfoNode,
13};
14use lean_rs_host::meta::{self, LeanMetaOptions, LeanMetaResponse, LeanMetaTransparency};
15use lean_rs_host::{
16 LeanCapabilities, LeanDeclarationFilter, LeanElabFailure, LeanElabOptions, LeanHost, LeanKernelOutcome,
17 LeanSession, LeanSeverity, LeanSourceRange,
18};
19use serde::Deserialize;
20use serde_json::value::RawValue;
21
22use crate::protocol::{
23 DataRowEmitter, Diagnostic, Message, ProgressTick, ProtocolError, Request, Response, StreamSummary, read_frame,
24 write_frame,
25};
26use crate::types::{
27 LeanWorkerCapabilityMetadata, LeanWorkerCommandInfo, LeanWorkerDeclarationFilter, LeanWorkerDeclarationRow,
28 LeanWorkerDiagnostic, LeanWorkerDoctorReport, LeanWorkerElabFailure, LeanWorkerElabOptions, LeanWorkerElabResult,
29 LeanWorkerKernelResult, LeanWorkerKernelStatus, LeanWorkerKernelSummary, LeanWorkerMetaResult,
30 LeanWorkerMetaTransparency, LeanWorkerNameRef, LeanWorkerProcessFileOutcome, LeanWorkerProcessModuleOutcome,
31 LeanWorkerProcessedFile, LeanWorkerRendered, LeanWorkerRendering, LeanWorkerSourceRange, LeanWorkerTacticInfo,
32 LeanWorkerTermInfo,
33};
34
35#[derive(Clone)]
36struct ProtocolWriter {
37 stdout: Arc<Mutex<std::io::Stdout>>,
38}
39
40impl ProtocolWriter {
41 fn new() -> Self {
42 Self {
43 stdout: Arc::new(Mutex::new(std::io::stdout())),
44 }
45 }
46
47 fn write(&self, message: Message) -> Result<(), ProtocolError> {
48 let mut stdout = self
49 .stdout
50 .lock()
51 .map_err(|_| ProtocolError::Io(std::io::Error::other("worker stdout mutex was poisoned")))?;
52 write_frame(&mut *stdout, message)
53 }
54}
55
56pub(crate) fn run_stdio() -> ExitCode {
57 install_immediate_abort_exit();
58 match serve_stdio() {
59 Ok(()) => ExitCode::SUCCESS,
60 Err(err) => {
61 eprintln!("lean-rs-worker-child: {err}");
62 ExitCode::FAILURE
63 }
64 }
65}
66
67#[cfg(unix)]
105#[allow(
106 unsafe_code,
107 reason = "installing a signal handler and calling setrlimit/prctl require libc FFI"
108)]
109fn install_immediate_abort_exit() {
110 extern "C" fn on_sigabrt(_sig: libc::c_int) {
111 const MARKER: &[u8] = b"lean-rs-worker child: SIGABRT, exiting immediately\n";
115 unsafe {
116 let _ = libc::write(libc::STDERR_FILENO, MARKER.as_ptr().cast(), MARKER.len());
117 libc::_exit(134);
118 }
119 }
120
121 unsafe {
126 let mut action: libc::sigaction = std::mem::zeroed();
127 action.sa_sigaction = on_sigabrt as *const () as libc::sighandler_t;
128 libc::sigemptyset(&raw mut action.sa_mask);
129 action.sa_flags = libc::SA_RESETHAND;
130 let _ = libc::sigaction(libc::SIGABRT, &raw const action, std::ptr::null_mut());
131 }
132
133 unsafe {
139 let limit = libc::rlimit {
140 rlim_cur: 0,
141 rlim_max: 0,
142 };
143 let _ = libc::setrlimit(libc::RLIMIT_CORE, &raw const limit);
144 #[cfg(target_os = "linux")]
145 {
146 let zero: libc::c_ulong = 0;
147 let _ = libc::prctl(libc::PR_SET_DUMPABLE, zero, zero, zero, zero);
148 }
149 }
150}
151
152#[cfg(not(unix))]
153fn install_immediate_abort_exit() {}
154
155#[allow(
156 clippy::significant_drop_tightening,
157 reason = "the child owns stdin/stdout for the full protocol loop"
158)]
159fn serve_stdio() -> Result<(), Box<dyn std::error::Error>> {
160 let runtime = LeanRuntime::init()?;
161 let stdin = std::io::stdin();
162 let mut reader = stdin.lock();
163 let writer = ProtocolWriter::new();
164 let mut host_session: Option<HostSessionState> = None;
165
166 writer.write(Message::Handshake {
167 worker_version: env!("CARGO_PKG_VERSION").to_owned(),
168 protocol_version: crate::protocol::PROTOCOL_VERSION,
169 })?;
170
171 loop {
172 let frame = read_frame(&mut reader)?;
173 let Message::Request(request) = frame.message else {
174 writer.write(Message::Response(Response::Error {
175 code: "lean_rs.worker.protocol.unexpected_frame".to_owned(),
176 message: "child expected request frame".to_owned(),
177 }))?;
178 continue;
179 };
180
181 match request {
182 Request::Health => {
183 writer.write(Message::Response(Response::HealthOk))?;
184 }
185 Request::LoadFixtureCapability { fixture_root } => {
186 let response = match load_fixture_capability(runtime, Path::new(&fixture_root)) {
187 Ok(()) => Response::CapabilityLoaded,
188 Err(err) => error_response(&err),
189 };
190 writer.write(Message::Response(response))?;
191 }
192 Request::CallFixtureMul { fixture_root, lhs, rhs } => {
193 let response = match call_fixture_mul(runtime, Path::new(&fixture_root), lhs, rhs) {
194 Ok(value) => Response::U64 { value },
195 Err(err) => error_response(&err),
196 };
197 writer.write(Message::Response(response))?;
198 }
199 Request::TriggerLeanPanic { fixture_root } => {
200 let response = match trigger_lean_panic(runtime, Path::new(&fixture_root)) {
201 Ok(()) => Response::Error {
202 code: "lean_rs.worker.panic_fixture_returned".to_owned(),
203 message: "Lean panic fixture returned instead of terminating the child".to_owned(),
204 },
205 Err(err) => error_response(&err),
206 };
207 writer.write(Message::Response(response))?;
208 }
209 Request::OpenHostSession {
210 project_root,
211 package,
212 lib_name,
213 imports,
214 } => {
215 let response = match HostSessionState::open(runtime, &project_root, &package, &lib_name, &imports) {
216 Ok(state) => {
217 host_session = Some(state);
218 Response::HostSessionOpened
219 }
220 Err(err) => error_response(&err),
221 };
222 writer.write(Message::Response(response))?;
223 }
224 Request::Elaborate { source, options } => {
225 let response = match host_session.as_mut() {
226 Some(state) => match state.elaborate(&source, &options) {
227 Ok(outcome) => Response::Elaboration { outcome },
228 Err(err) => error_response(&err),
229 },
230 None => missing_session_response(),
231 };
232 writer.write(Message::Response(response))?;
233 }
234 Request::KernelCheck {
235 source,
236 options,
237 progress,
238 } => {
239 let response = match host_session.as_mut() {
240 Some(state) => match state.kernel_check(&source, &options, progress, &writer) {
241 Ok(outcome) => Response::KernelCheck { outcome },
242 Err(err) => error_response(&err),
243 },
244 None => missing_session_response(),
245 };
246 writer.write(Message::Response(response))?;
247 }
248 Request::DeclarationKinds { names, progress } => {
249 let response = match host_session.as_mut() {
250 Some(state) => match state.declaration_kinds(&names, progress, &writer) {
251 Ok(values) => Response::Strings { values },
252 Err(err) => error_response(&err),
253 },
254 None => missing_session_response(),
255 };
256 writer.write(Message::Response(response))?;
257 }
258 Request::DeclarationNames { names, progress } => {
259 let response = match host_session.as_mut() {
260 Some(state) => match state.declaration_names(&names, progress, &writer) {
261 Ok(values) => Response::Strings { values },
262 Err(err) => error_response(&err),
263 },
264 None => missing_session_response(),
265 };
266 writer.write(Message::Response(response))?;
267 }
268 Request::RunDataStream {
269 export,
270 request_json,
271 progress,
272 } => {
273 let response = match host_session.as_mut() {
274 Some(state) => match state.run_data_stream(&export, &request_json, progress, &writer) {
275 Ok(summary) => Response::StreamComplete { summary },
276 Err(StreamRunError::Host(err)) => error_response(&err),
277 Err(StreamRunError::ExportStatus(status)) => {
278 Response::StreamExportFailed { status_byte: status }
279 }
280 Err(StreamRunError::CallbackStatus(status)) => Response::StreamCallbackFailed {
281 status_byte: status.as_abi(),
282 description: status.description().to_owned(),
283 },
284 Err(StreamRunError::MalformedRow(message)) => Response::StreamRowMalformed { message },
285 },
286 None => missing_session_response(),
287 };
288 writer.write(Message::Response(response))?;
289 }
290 Request::CapabilityMetadata { export, request_json } => {
291 let response = match host_session.as_mut() {
292 Some(state) => match state.capability_metadata(&export, &request_json) {
293 Ok(metadata) => Response::CapabilityMetadata { metadata },
294 Err(CapabilityJsonError::Host(err)) => error_response(&err),
295 Err(CapabilityJsonError::Malformed(message)) => {
296 Response::CapabilityMetadataMalformed { message }
297 }
298 },
299 None => missing_session_response(),
300 };
301 writer.write(Message::Response(response))?;
302 }
303 Request::CapabilityDoctor { export, request_json } => {
304 let response = match host_session.as_mut() {
305 Some(state) => match state.capability_doctor(&export, &request_json) {
306 Ok(report) => Response::CapabilityDoctor { report },
307 Err(CapabilityJsonError::Host(err)) => error_response(&err),
308 Err(CapabilityJsonError::Malformed(message)) => Response::CapabilityDoctorMalformed { message },
309 },
310 None => missing_session_response(),
311 };
312 writer.write(Message::Response(response))?;
313 }
314 Request::JsonCommand { export, request_json } => {
315 let response = match host_session.as_mut() {
316 Some(state) => match state.json_command(&export, &request_json) {
317 Ok(response_json) => Response::JsonCommand { response_json },
318 Err(err) => error_response(&err),
319 },
320 None => missing_session_response(),
321 };
322 writer.write(Message::Response(response))?;
323 }
324 Request::InferType { source, options } => {
325 let response = match host_session.as_mut() {
326 Some(state) => match state.infer_type(&source, &options) {
327 Ok(result) => Response::MetaExpr { result },
328 Err(err) => error_response(&err),
329 },
330 None => missing_session_response(),
331 };
332 writer.write(Message::Response(response))?;
333 }
334 Request::Whnf { source, options } => {
335 let response = match host_session.as_mut() {
336 Some(state) => match state.whnf(&source, &options) {
337 Ok(result) => Response::MetaExpr { result },
338 Err(err) => error_response(&err),
339 },
340 None => missing_session_response(),
341 };
342 writer.write(Message::Response(response))?;
343 }
344 Request::IsDefEq {
345 lhs,
346 rhs,
347 transparency,
348 options,
349 } => {
350 let response = match host_session.as_mut() {
351 Some(state) => match state.is_def_eq(&lhs, &rhs, transparency, &options) {
352 Ok(result) => Response::MetaBool { result },
353 Err(err) => error_response(&err),
354 },
355 None => missing_session_response(),
356 };
357 writer.write(Message::Response(response))?;
358 }
359 Request::Describe { name } => {
360 let response = match host_session.as_mut() {
361 Some(state) => match state.describe(&name) {
362 Ok(row) => Response::Declaration { row },
363 Err(err) => error_response(&err),
364 },
365 None => missing_session_response(),
366 };
367 writer.write(Message::Response(response))?;
368 }
369 Request::ListDeclarationsStrings { filter, progress } => {
370 let response = match host_session.as_mut() {
371 Some(state) => match state.list_declarations_strings(filter, progress, &writer) {
372 Ok(count) => Response::RowsComplete { count },
373 Err(err) => error_response(&err),
374 },
375 None => missing_session_response(),
376 };
377 writer.write(Message::Response(response))?;
378 }
379 Request::DescribeBulk { names, progress } => {
380 let response = match host_session.as_mut() {
381 Some(state) => match state.describe_bulk(&names, progress, &writer) {
382 Ok(rows) => Response::DeclarationBulk { rows },
383 Err(err) => error_response(&err),
384 },
385 None => missing_session_response(),
386 };
387 writer.write(Message::Response(response))?;
388 }
389 Request::ProcessFile { source, options } => {
390 let response = match host_session.as_mut() {
391 Some(state) => match state.process_file(&source, &options) {
392 Ok(outcome) => Response::ProcessFile { outcome },
393 Err(err) => error_response(&err),
394 },
395 None => missing_session_response(),
396 };
397 writer.write(Message::Response(response))?;
398 }
399 Request::ProcessModule { source, options } => {
400 let response = match host_session.as_mut() {
401 Some(state) => match state.process_module(&source, &options) {
402 Ok(outcome) => Response::ProcessModule { outcome },
403 Err(err) => error_response(&err),
404 },
405 None => missing_session_response(),
406 };
407 writer.write(Message::Response(response))?;
408 }
409 Request::EmitTestRows { streams } => {
410 let count = emit_test_rows(&writer, &streams)?;
411 writer.write(Message::Response(Response::RowsComplete { count }))?;
412 }
413 Request::EmitTestRowsThenExit => {
414 let _count = emit_test_rows(&writer, &["rows".to_owned()])?;
415 return Ok(());
416 }
417 Request::EmitTestRowsThenPanic => {
418 let _count = emit_test_rows(&writer, &["rows".to_owned()])?;
419 std::process::abort();
420 }
421 Request::Terminate => {
422 writer.write(Message::Response(Response::Terminating))?;
423 return Ok(());
424 }
425 }
426 }
427}
428
429fn load_fixture_capability(runtime: &'static LeanRuntime, fixture_root: &Path) -> LeanResult<()> {
430 let host = LeanHost::from_lake_project(runtime, fixture_root)?;
431 let _caps = host.load_capabilities("lean_rs_fixture", "LeanRsFixture")?;
432 Ok(())
433}
434
435fn call_fixture_mul(runtime: &'static LeanRuntime, fixture_root: &Path, lhs: u64, rhs: u64) -> LeanResult<u64> {
436 let host = LeanHost::from_lake_project(runtime, fixture_root)?;
437 let caps = host.load_capabilities("lean_rs_fixture", "LeanRsFixture")?;
438 let mut session = caps.session(&["LeanRsFixture.Scalars"], None, None)?;
439 session.call_capability::<(u64, u64), u64>("lean_rs_fixture_u64_mul", (lhs, rhs), None)
440}
441
442fn trigger_lean_panic(runtime: &'static LeanRuntime, fixture_root: &Path) -> LeanResult<()> {
443 let host = LeanHost::from_lake_project(runtime, fixture_root)?;
444 let caps = host.load_capabilities("lean_rs_fixture", "LeanRsFixture")?;
445 let mut session = caps.session(&["LeanRsFixture.Effects"], None, None)?;
446 session.call_capability::<(u8,), ()>("lean_rs_fixture_panic_unit", (0,), None)
447}
448
449fn error_response(err: &LeanError) -> Response {
450 Response::Error {
451 code: err.code().as_str().to_owned(),
452 message: err.to_string(),
453 }
454}
455
456fn missing_session_response() -> Response {
457 Response::Error {
458 code: "lean_rs.worker.session_missing".to_owned(),
459 message: "open a LeanWorkerSession before sending host-session requests".to_owned(),
460 }
461}
462
463struct HostSessionState {
464 #[allow(dead_code, reason = "leaked host anchors the capability and session lifetimes")]
465 host: &'static LeanHost<'static>,
466 #[allow(dead_code, reason = "leaked capabilities anchor the session borrow")]
467 capabilities: &'static LeanCapabilities<'static, 'static>,
468 session: LeanSession<'static, 'static>,
469}
470
471impl HostSessionState {
472 fn open(
473 runtime: &'static LeanRuntime,
474 project_root: &str,
475 package: &str,
476 lib_name: &str,
477 imports: &[String],
478 ) -> LeanResult<Self> {
479 let host = Box::leak(Box::new(LeanHost::from_lake_project(runtime, Path::new(project_root))?));
480 let capabilities = Box::leak(Box::new(host.load_capabilities(package, lib_name)?));
481 let import_refs: Vec<&str> = imports.iter().map(String::as_str).collect();
482 let session = capabilities.session(&import_refs, None, None)?;
483 Ok(Self {
484 host,
485 capabilities,
486 session,
487 })
488 }
489
490 fn elaborate(&mut self, source: &str, options: &LeanWorkerElabOptions) -> LeanResult<LeanWorkerElabResult> {
491 let options = options.to_host_options();
492 let outcome = self.session.elaborate(source, None, &options, None)?;
493 Ok(match outcome {
494 Ok(_expr) => LeanWorkerElabResult {
495 success: true,
496 diagnostics: Vec::new(),
497 truncated: false,
498 },
499 Err(failure) => elab_failure_outcome(&failure),
500 })
501 }
502
503 fn kernel_check(
504 &mut self,
505 source: &str,
506 options: &LeanWorkerElabOptions,
507 progress: bool,
508 writer: &ProtocolWriter,
509 ) -> LeanResult<LeanWorkerKernelResult> {
510 if progress {
511 emit_progress(writer, "kernel_check", 0, Some(1));
512 }
513 let options = options.to_host_options();
514 let outcome = self.session.kernel_check(source, &options, None, None)?;
515 if progress {
516 emit_progress(writer, "kernel_check", 1, Some(1));
517 }
518 Ok(match outcome {
519 LeanKernelOutcome::Checked(evidence) => {
520 let summary = self.session.summarize_evidence(&evidence, None)?;
521 LeanWorkerKernelResult {
522 status: LeanWorkerKernelStatus::Checked,
523 diagnostics: Vec::new(),
524 truncated: false,
525 summary: Some(LeanWorkerKernelSummary {
526 declaration_name: summary.declaration_name().to_owned(),
527 kind: summary.kind().to_owned(),
528 type_signature: summary.type_signature().to_owned(),
529 }),
530 }
531 }
532 LeanKernelOutcome::Rejected(failure) => kernel_failure_outcome(LeanWorkerKernelStatus::Rejected, &failure),
533 LeanKernelOutcome::Unavailable(failure) => {
534 kernel_failure_outcome(LeanWorkerKernelStatus::Unavailable, &failure)
535 }
536 LeanKernelOutcome::Unsupported(failure) => {
537 kernel_failure_outcome(LeanWorkerKernelStatus::Unsupported, &failure)
538 }
539 })
540 }
541
542 fn declaration_kinds(
543 &mut self,
544 names: &[String],
545 progress: bool,
546 writer: &ProtocolWriter,
547 ) -> LeanResult<Vec<String>> {
548 if progress {
549 let total = Some(u64::try_from(names.len()).unwrap_or(u64::MAX));
550 let mut out = Vec::with_capacity(names.len());
551 for (idx, name) in names.iter().enumerate() {
552 out.push(self.session.declaration_kind(name, None)?);
553 emit_progress(
554 writer,
555 "declaration_kind_bulk",
556 u64::try_from(idx.saturating_add(1)).unwrap_or(u64::MAX),
557 total,
558 );
559 }
560 Ok(out)
561 } else {
562 let refs: Vec<&str> = names.iter().map(String::as_str).collect();
563 self.session.declaration_kind_bulk(&refs, None, None)
564 }
565 }
566
567 fn declaration_names(
568 &mut self,
569 names: &[String],
570 progress: bool,
571 writer: &ProtocolWriter,
572 ) -> LeanResult<Vec<String>> {
573 if progress {
574 let total = Some(u64::try_from(names.len()).unwrap_or(u64::MAX));
575 let mut out = Vec::with_capacity(names.len());
576 for (idx, name) in names.iter().enumerate() {
577 out.push(self.session.declaration_name(name, None)?);
578 emit_progress(
579 writer,
580 "declaration_name_bulk",
581 u64::try_from(idx.saturating_add(1)).unwrap_or(u64::MAX),
582 total,
583 );
584 }
585 Ok(out)
586 } else {
587 let refs: Vec<&str> = names.iter().map(String::as_str).collect();
588 self.session.declaration_name_bulk(&refs, None, None)
589 }
590 }
591
592 fn run_data_stream(
593 &mut self,
594 export: &str,
595 request_json: &str,
596 progress: bool,
597 writer: &ProtocolWriter,
598 ) -> Result<StreamSummary, StreamRunError> {
599 if progress {
600 emit_progress(writer, "data_stream", 0, None);
601 }
602
603 let started = Instant::now();
604 let forwarder = Arc::new(Mutex::new(StreamForwarder::new(writer.clone(), progress)));
605 let row_error = Arc::new(Mutex::new(None::<StreamCallbackError>));
606 let callback_forwarder = Arc::clone(&forwarder);
607 let callback_error = Arc::clone(&row_error);
608 let callback = LeanCallbackHandle::<LeanStringEvent>::register(move |event| {
609 if callback_error.lock().map_or(true, |guard| guard.is_some()) {
610 return LeanCallbackFlow::Stop;
611 }
612 match parse_row_envelope(&event.value) {
613 Ok(StreamCallbackEvent::Row(row)) => match callback_forwarder.lock() {
614 Ok(mut guard) => match guard.emit_row(row) {
615 Ok(()) => LeanCallbackFlow::Continue,
616 Err(err) => {
617 if let Ok(mut guard) = callback_error.lock() {
618 *guard = Some(StreamCallbackError::Write(err.to_string()));
619 }
620 LeanCallbackFlow::Stop
621 }
622 },
623 Err(_) => {
624 if let Ok(mut guard) = callback_error.lock() {
625 *guard = Some(StreamCallbackError::Malformed(
626 "stream forwarder mutex was poisoned".to_owned(),
627 ));
628 }
629 LeanCallbackFlow::Stop
630 }
631 },
632 Ok(StreamCallbackEvent::Diagnostic(diagnostic)) => match callback_forwarder.lock() {
633 Ok(guard) => match guard.emit_diagnostic(diagnostic) {
634 Ok(()) => LeanCallbackFlow::Continue,
635 Err(err) => {
636 if let Ok(mut guard) = callback_error.lock() {
637 *guard = Some(StreamCallbackError::Write(err.to_string()));
638 }
639 LeanCallbackFlow::Stop
640 }
641 },
642 Err(_) => {
643 if let Ok(mut guard) = callback_error.lock() {
644 *guard = Some(StreamCallbackError::Malformed(
645 "stream forwarder mutex was poisoned".to_owned(),
646 ));
647 }
648 LeanCallbackFlow::Stop
649 }
650 },
651 Ok(StreamCallbackEvent::Progress(progress)) => match callback_forwarder.lock() {
652 Ok(guard) => match guard.emit_progress(progress) {
653 Ok(()) => LeanCallbackFlow::Continue,
654 Err(err) => {
655 if let Ok(mut guard) = callback_error.lock() {
656 *guard = Some(StreamCallbackError::Write(err.to_string()));
657 }
658 LeanCallbackFlow::Stop
659 }
660 },
661 Err(_) => {
662 if let Ok(mut guard) = callback_error.lock() {
663 *guard = Some(StreamCallbackError::Malformed(
664 "stream forwarder mutex was poisoned".to_owned(),
665 ));
666 }
667 LeanCallbackFlow::Stop
668 }
669 },
670 Ok(StreamCallbackEvent::Metadata(metadata)) => match callback_forwarder.lock() {
671 Ok(mut guard) => {
672 guard.set_metadata(metadata);
673 LeanCallbackFlow::Continue
674 }
675 Err(_) => {
676 if let Ok(mut guard) = callback_error.lock() {
677 *guard = Some(StreamCallbackError::Malformed(
678 "stream forwarder mutex was poisoned".to_owned(),
679 ));
680 }
681 LeanCallbackFlow::Stop
682 }
683 },
684 Err(message) => {
685 if let Ok(mut guard) = callback_error.lock() {
686 *guard = Some(StreamCallbackError::Malformed(message));
687 }
688 LeanCallbackFlow::Stop
689 }
690 }
691 })
692 .map_err(StreamRunError::Host)?;
693
694 let (handle, trampoline) = callback.abi_parts();
695 let status = self
696 .session
697 .call_capability::<(&str, usize, usize), LeanIo<u8>>(export, (request_json, handle, trampoline), None)
698 .map_err(StreamRunError::Host)?;
699
700 if let Some(error) = row_error.lock().ok().and_then(|mut guard| guard.take()) {
701 return Err(match error {
702 StreamCallbackError::Malformed(message) => StreamRunError::MalformedRow(message),
703 StreamCallbackError::Write(message) => {
704 StreamRunError::Host(host_internal(format!("worker stream frame write failed: {message}")))
705 }
706 });
707 }
708
709 match LeanCallbackStatus::from_abi(status) {
710 Some(LeanCallbackStatus::Ok) => {}
711 Some(status) => return Err(StreamRunError::CallbackStatus(status)),
712 None => return Err(StreamRunError::ExportStatus(status)),
713 }
714
715 let guard = forwarder
716 .lock()
717 .map_err(|_| StreamRunError::MalformedRow("stream forwarder mutex was poisoned".to_owned()))?;
718 Ok(guard.summary(started.elapsed()))
719 }
720
721 fn capability_metadata(
722 &mut self,
723 export: &str,
724 request_json: &str,
725 ) -> Result<LeanWorkerCapabilityMetadata, CapabilityJsonError> {
726 let raw = self
727 .session
728 .call_capability::<(&str,), LeanIo<String>>(export, (request_json,), None)
729 .map_err(CapabilityJsonError::Host)?;
730 serde_json::from_str(&raw).map_err(|err| CapabilityJsonError::Malformed(err.to_string()))
731 }
732
733 fn capability_doctor(
734 &mut self,
735 export: &str,
736 request_json: &str,
737 ) -> Result<LeanWorkerDoctorReport, CapabilityJsonError> {
738 let raw = self
739 .session
740 .call_capability::<(&str,), LeanIo<String>>(export, (request_json,), None)
741 .map_err(CapabilityJsonError::Host)?;
742 serde_json::from_str(&raw).map_err(|err| CapabilityJsonError::Malformed(err.to_string()))
743 }
744
745 fn json_command(&mut self, export: &str, request_json: &str) -> LeanResult<String> {
746 self.session
747 .call_capability::<(&str,), LeanIo<String>>(export, (request_json,), None)
748 }
749
750 fn infer_type(
751 &mut self,
752 source: &str,
753 options: &LeanWorkerElabOptions,
754 ) -> LeanResult<LeanWorkerMetaResult<LeanWorkerRendered>> {
755 let elab_options = options.to_host_options();
756 let elab_outcome = self.session.elaborate(source, None, &elab_options, None)?;
757 let expr = match elab_outcome {
758 Ok(expr) => expr,
759 Err(failure) => return Ok(meta_failure_from_elab(&failure)),
760 };
761 let meta_options = options.to_host_meta_options(LeanMetaTransparency::Default);
762 let response = self.session.run_meta(&meta::infer_type(), expr, &meta_options, None)?;
763 meta_render_expr(&mut self.session, response, &meta_options)
764 }
765
766 fn whnf(
767 &mut self,
768 source: &str,
769 options: &LeanWorkerElabOptions,
770 ) -> LeanResult<LeanWorkerMetaResult<LeanWorkerRendered>> {
771 let elab_options = options.to_host_options();
772 let elab_outcome = self.session.elaborate(source, None, &elab_options, None)?;
773 let expr = match elab_outcome {
774 Ok(expr) => expr,
775 Err(failure) => return Ok(meta_failure_from_elab(&failure)),
776 };
777 let meta_options = options.to_host_meta_options(LeanMetaTransparency::Default);
778 let response = self.session.run_meta(&meta::whnf(), expr, &meta_options, None)?;
779 meta_render_expr(&mut self.session, response, &meta_options)
780 }
781
782 fn is_def_eq(
783 &mut self,
784 lhs: &str,
785 rhs: &str,
786 transparency: LeanWorkerMetaTransparency,
787 options: &LeanWorkerElabOptions,
788 ) -> LeanResult<LeanWorkerMetaResult<bool>> {
789 let elab_options = options.to_host_options();
790 let lhs_outcome = self.session.elaborate(lhs, None, &elab_options, None)?;
791 let lhs_expr = match lhs_outcome {
792 Ok(expr) => expr,
793 Err(failure) => return Ok(meta_failure_from_elab(&failure)),
794 };
795 let rhs_outcome = self.session.elaborate(rhs, None, &elab_options, None)?;
796 let rhs_expr = match rhs_outcome {
797 Ok(expr) => expr,
798 Err(failure) => return Ok(meta_failure_from_elab(&failure)),
799 };
800 let transparency_host = transparency.into();
801 let meta_options = options.to_host_meta_options(transparency_host);
802 let response = self.session.run_meta(
803 &meta::is_def_eq(),
804 (lhs_expr, rhs_expr, transparency_host),
805 &meta_options,
806 None,
807 )?;
808 match response {
809 LeanMetaResponse::Ok(value) => Ok(LeanWorkerMetaResult::Ok { value }),
810 LeanMetaResponse::Failed(failure) => Ok(LeanWorkerMetaResult::Failed {
811 failure: elab_failure_wire(&failure),
812 }),
813 LeanMetaResponse::TimeoutOrHeartbeat(failure) => Ok(LeanWorkerMetaResult::TimeoutOrHeartbeat {
814 failure: elab_failure_wire(&failure),
815 }),
816 LeanMetaResponse::Unsupported(failure) => Ok(LeanWorkerMetaResult::Unsupported {
817 failure: elab_failure_wire(&failure),
818 }),
819 }
820 }
821
822 fn describe(&mut self, name: &str) -> LeanResult<Option<LeanWorkerDeclarationRow>> {
823 let kind = self.session.declaration_kind(name, None)?;
824 if kind == "missing" {
825 return Ok(None);
826 }
827 let type_signature = match self.session.declaration_type(name, None)? {
828 Some(expr) => Some(self.session.expr_to_string_raw(&expr, None)?),
829 None => None,
830 };
831 let source = self
832 .session
833 .declaration_source_range(name, None)?
834 .map(source_range_wire);
835 Ok(Some(LeanWorkerDeclarationRow {
836 name: name.to_owned(),
837 kind,
838 type_signature,
839 source,
840 }))
841 }
842
843 fn list_declarations_strings(
844 &mut self,
845 filter: LeanWorkerDeclarationFilter,
846 progress: bool,
847 writer: &ProtocolWriter,
848 ) -> LeanResult<u64> {
849 let host_filter = LeanDeclarationFilter {
850 include_private: filter.include_private,
851 include_generated: filter.include_generated,
852 include_internal: filter.include_internal,
853 };
854 if progress {
855 emit_progress(writer, "list_declarations_strings", 0, None);
856 }
857 let names = self.session.list_declarations_strings(&host_filter, None, None)?;
858 let total = u64::try_from(names.len()).unwrap_or(u64::MAX);
859 let mut emitter = DataRowEmitter::default();
860 for name in names {
861 let payload = serde_json::value::to_raw_value(&name)
862 .map_err(|err| host_internal(format!("list_declarations_strings row payload encode failed: {err}")))?;
863 let row = emitter.next("rows", payload);
864 writer
865 .write(Message::DataRow(row))
866 .map_err(|err| host_internal(format!("list_declarations_strings row frame write failed: {err}")))?;
867 }
868 if progress {
869 emit_progress(writer, "list_declarations_strings", total, Some(total));
870 }
871 Ok(emitter.count())
872 }
873
874 fn describe_bulk(
875 &mut self,
876 names: &[String],
877 progress: bool,
878 writer: &ProtocolWriter,
879 ) -> LeanResult<Vec<LeanWorkerDeclarationRow>> {
880 let refs: Vec<&str> = names.iter().map(String::as_str).collect();
881 let kinds = self.session.declaration_kind_bulk(&refs, None, None)?;
882 let types = self.session.declaration_type_bulk(&refs, None, None)?;
883 let total = Some(u64::try_from(names.len()).unwrap_or(u64::MAX));
884 let mut rows = Vec::with_capacity(names.len());
885 for (idx, name) in names.iter().enumerate() {
886 let kind = kinds.get(idx).cloned().unwrap_or_else(|| "missing".to_owned());
887 let row = if kind == "missing" {
888 LeanWorkerDeclarationRow {
889 name: name.clone(),
890 kind,
891 type_signature: None,
892 source: None,
893 }
894 } else {
895 let type_signature = match types.get(idx).and_then(Option::as_ref) {
896 Some(expr) => Some(self.session.expr_to_string_raw(expr, None)?),
897 None => None,
898 };
899 let source = self
900 .session
901 .declaration_source_range(name, None)?
902 .map(source_range_wire);
903 LeanWorkerDeclarationRow {
904 name: name.clone(),
905 kind,
906 type_signature,
907 source,
908 }
909 };
910 rows.push(row);
911 if progress {
912 emit_progress(
913 writer,
914 "describe_bulk",
915 u64::try_from(idx.saturating_add(1)).unwrap_or(u64::MAX),
916 total,
917 );
918 }
919 }
920 Ok(rows)
921 }
922
923 fn process_file(
924 &mut self,
925 source: &str,
926 options: &LeanWorkerElabOptions,
927 ) -> LeanResult<LeanWorkerProcessFileOutcome> {
928 let options = options.to_host_options();
929 Ok(match self.session.process_with_info_tree(source, &options, None)? {
930 ProcessFileOutcome::Processed(file) => LeanWorkerProcessFileOutcome::Processed {
931 file: processed_file_wire(file),
932 },
933 ProcessFileOutcome::Unsupported => LeanWorkerProcessFileOutcome::Unsupported,
934 })
935 }
936
937 fn process_module(
938 &mut self,
939 source: &str,
940 options: &LeanWorkerElabOptions,
941 ) -> LeanResult<LeanWorkerProcessModuleOutcome> {
942 let options = options.to_host_options();
943 Ok(
944 match self.session.process_module_with_info_tree(source, &options, None)? {
945 ProcessModuleOutcome::Ok { file, imports } => LeanWorkerProcessModuleOutcome::Ok {
946 file: processed_file_wire(file),
947 imports,
948 },
949 ProcessModuleOutcome::MissingImports { file, imports, missing } => {
950 LeanWorkerProcessModuleOutcome::MissingImports {
951 file: processed_file_wire(file),
952 imports,
953 missing,
954 }
955 }
956 ProcessModuleOutcome::HeaderParseFailed { diagnostics } => {
957 LeanWorkerProcessModuleOutcome::HeaderParseFailed {
958 diagnostics: elab_failure_wire(&diagnostics),
959 }
960 }
961 ProcessModuleOutcome::Unsupported => LeanWorkerProcessModuleOutcome::Unsupported,
962 },
963 )
964 }
965}
966
967#[derive(Clone, Debug)]
968struct PendingDataRow {
969 stream: String,
970 payload: Box<RawValue>,
971}
972
973enum StreamCallbackEvent {
974 Row(PendingDataRow),
975 Diagnostic(Diagnostic),
976 Progress(ProgressTick),
977 Metadata(serde_json::Value),
978}
979
980enum StreamCallbackError {
981 Malformed(String),
982 Write(String),
983}
984
985struct StreamForwarder {
986 writer: ProtocolWriter,
987 emitter: DataRowEmitter,
988 progress: bool,
989 metadata: Option<serde_json::Value>,
990}
991
992impl StreamForwarder {
993 fn new(writer: ProtocolWriter, progress: bool) -> Self {
994 Self {
995 writer,
996 emitter: DataRowEmitter::default(),
997 progress,
998 metadata: None,
999 }
1000 }
1001
1002 fn emit_row(&mut self, row: PendingDataRow) -> Result<(), ProtocolError> {
1003 let row = self.emitter.next(row.stream, row.payload);
1004 self.writer.write(Message::DataRow(row))?;
1005 if self.progress {
1006 emit_progress(&self.writer, "data_stream", self.emitter.count(), None);
1007 }
1008 Ok(())
1009 }
1010
1011 fn emit_diagnostic(&self, diagnostic: Diagnostic) -> Result<(), ProtocolError> {
1012 self.writer.write(Message::Diagnostic(diagnostic))
1013 }
1014
1015 fn emit_progress(&self, progress: ProgressTick) -> Result<(), ProtocolError> {
1016 self.writer.write(Message::ProgressTick(progress))
1017 }
1018
1019 fn set_metadata(&mut self, metadata: serde_json::Value) {
1020 self.metadata = Some(metadata);
1021 }
1022
1023 fn summary(&self, elapsed: std::time::Duration) -> StreamSummary {
1024 StreamSummary::new(
1025 self.emitter.count(),
1026 self.emitter.per_stream_counts(),
1027 elapsed,
1028 self.metadata.clone(),
1029 )
1030 }
1031}
1032
1033#[derive(Debug)]
1034enum StreamRunError {
1035 Host(LeanError),
1036 ExportStatus(u8),
1037 CallbackStatus(LeanCallbackStatus),
1038 MalformedRow(String),
1039}
1040
1041enum CapabilityJsonError {
1042 Host(LeanError),
1043 Malformed(String),
1044}
1045
1046impl From<crate::protocol::ProtocolError> for StreamRunError {
1047 fn from(value: crate::protocol::ProtocolError) -> Self {
1048 Self::Host(host_internal(format!("worker data-row frame write failed: {value}")))
1049 }
1050}
1051
1052fn parse_row_envelope(raw: &str) -> Result<StreamCallbackEvent, String> {
1053 let envelope: RowCallbackEnvelope =
1054 serde_json::from_str(raw).map_err(|err| format!("row callback payload is not valid JSON: {err}"))?;
1055 if let Some(diagnostic) = envelope.diagnostic {
1056 let code = diagnostic
1057 .code
1058 .filter(|value| !value.is_empty())
1059 .ok_or_else(|| "diagnostic callback payload must contain a non-empty string field `code`".to_owned())?;
1060 let message = diagnostic
1061 .message
1062 .ok_or_else(|| "diagnostic callback payload must contain a string field `message`".to_owned())?;
1063 return Ok(StreamCallbackEvent::Diagnostic(Diagnostic { code, message }));
1064 }
1065 if let Some(progress) = envelope.progress {
1066 let phase = progress
1067 .phase
1068 .filter(|value| !value.is_empty())
1069 .ok_or_else(|| "progress callback payload must contain a non-empty string field `phase`".to_owned())?;
1070 return Ok(StreamCallbackEvent::Progress(ProgressTick {
1071 phase,
1072 current: progress.current,
1073 total: progress.total,
1074 }));
1075 }
1076 if let Some(metadata) = envelope.metadata {
1077 let metadata = serde_json::from_str(metadata.get())
1078 .map_err(|err| format!("metadata callback payload is not valid JSON: {err}"))?;
1079 return Ok(StreamCallbackEvent::Metadata(metadata));
1080 }
1081 let stream = envelope
1082 .stream
1083 .filter(|value| !value.is_empty())
1084 .ok_or_else(|| "row callback payload must contain a non-empty string field `stream`".to_owned())?;
1085 let payload = envelope
1086 .payload
1087 .ok_or_else(|| "row callback payload must contain field `payload`".to_owned())?;
1088 Ok(StreamCallbackEvent::Row(PendingDataRow { stream, payload }))
1089}
1090
1091#[derive(Deserialize)]
1092struct RowCallbackEnvelope {
1093 stream: Option<String>,
1094 payload: Option<Box<RawValue>>,
1095 diagnostic: Option<RowCallbackDiagnostic>,
1096 progress: Option<RowCallbackProgress>,
1097 metadata: Option<Box<RawValue>>,
1098}
1099
1100#[derive(Deserialize)]
1101struct RowCallbackDiagnostic {
1102 code: Option<String>,
1103 message: Option<String>,
1104}
1105
1106#[derive(Deserialize)]
1107struct RowCallbackProgress {
1108 phase: Option<String>,
1109 current: u64,
1110 total: Option<u64>,
1111}
1112
1113impl LeanWorkerElabOptions {
1114 fn to_host_options(&self) -> LeanElabOptions {
1115 LeanElabOptions::new()
1116 .namespace_context(&self.namespace_context)
1117 .file_label(&self.file_label)
1118 .heartbeat_limit(self.heartbeat_limit)
1119 .diagnostic_byte_limit(self.diagnostic_byte_limit)
1120 }
1121
1122 fn to_host_meta_options(&self, transparency: LeanMetaTransparency) -> LeanMetaOptions {
1123 LeanMetaOptions::new()
1124 .namespace_context(&self.namespace_context)
1125 .heartbeat_limit(self.heartbeat_limit)
1126 .diagnostic_byte_limit(self.diagnostic_byte_limit)
1127 .transparency(transparency)
1128 }
1129}
1130
1131impl From<LeanWorkerMetaTransparency> for LeanMetaTransparency {
1132 fn from(value: LeanWorkerMetaTransparency) -> Self {
1133 match value {
1134 LeanWorkerMetaTransparency::Default => Self::Default,
1135 LeanWorkerMetaTransparency::Reducible => Self::Reducible,
1136 LeanWorkerMetaTransparency::Instances => Self::Instances,
1137 LeanWorkerMetaTransparency::All => Self::All,
1138 }
1139 }
1140}
1141
1142fn elab_failure_wire(failure: &LeanElabFailure) -> LeanWorkerElabFailure {
1143 LeanWorkerElabFailure {
1144 diagnostics: diagnostics(failure),
1145 truncated: failure.truncated(),
1146 }
1147}
1148
1149fn meta_failure_from_elab<T>(failure: &LeanElabFailure) -> LeanWorkerMetaResult<T> {
1150 LeanWorkerMetaResult::Failed {
1151 failure: elab_failure_wire(failure),
1152 }
1153}
1154
1155fn meta_render_expr(
1156 session: &mut LeanSession<'static, 'static>,
1157 response: LeanMetaResponse<lean_rs::LeanExpr<'static>>,
1158 meta_options: &LeanMetaOptions,
1159) -> LeanResult<LeanWorkerMetaResult<LeanWorkerRendered>> {
1160 let expr = match response {
1161 LeanMetaResponse::Ok(expr) => expr,
1162 LeanMetaResponse::Failed(failure) => {
1163 return Ok(LeanWorkerMetaResult::Failed {
1164 failure: elab_failure_wire(&failure),
1165 });
1166 }
1167 LeanMetaResponse::TimeoutOrHeartbeat(failure) => {
1168 return Ok(LeanWorkerMetaResult::TimeoutOrHeartbeat {
1169 failure: elab_failure_wire(&failure),
1170 });
1171 }
1172 LeanMetaResponse::Unsupported(failure) => {
1173 return Ok(LeanWorkerMetaResult::Unsupported {
1174 failure: elab_failure_wire(&failure),
1175 });
1176 }
1177 };
1178 let pp_response = session.run_meta(&meta::pp_expr(), expr.clone(), meta_options, None)?;
1179 Ok(match pp_response {
1180 LeanMetaResponse::Ok(rendered) => LeanWorkerMetaResult::Ok {
1181 value: LeanWorkerRendered {
1182 value: rendered,
1183 rendering: LeanWorkerRendering::Pretty,
1184 },
1185 },
1186 LeanMetaResponse::Unsupported(_) => LeanWorkerMetaResult::Ok {
1187 value: LeanWorkerRendered {
1188 value: session.expr_to_string_raw(&expr, None)?,
1189 rendering: LeanWorkerRendering::Raw,
1190 },
1191 },
1192 LeanMetaResponse::Failed(failure) => LeanWorkerMetaResult::Failed {
1193 failure: elab_failure_wire(&failure),
1194 },
1195 LeanMetaResponse::TimeoutOrHeartbeat(failure) => LeanWorkerMetaResult::TimeoutOrHeartbeat {
1196 failure: elab_failure_wire(&failure),
1197 },
1198 })
1199}
1200
1201fn source_range_wire(range: LeanSourceRange) -> LeanWorkerSourceRange {
1202 LeanWorkerSourceRange {
1203 file: range.file,
1204 start_line: range.start_line,
1205 start_column: range.start_column,
1206 end_line: range.end_line,
1207 end_column: range.end_column,
1208 }
1209}
1210
1211fn command_info_wire(node: CommandInfoNode) -> LeanWorkerCommandInfo {
1212 LeanWorkerCommandInfo {
1213 start_line: node.start_line,
1214 start_column: node.start_column,
1215 end_line: node.end_line,
1216 end_column: node.end_column,
1217 decl_name: node.decl_name,
1218 }
1219}
1220
1221fn term_info_wire(node: TermInfoNode) -> LeanWorkerTermInfo {
1222 LeanWorkerTermInfo {
1223 start_line: node.start_line,
1224 start_column: node.start_column,
1225 end_line: node.end_line,
1226 end_column: node.end_column,
1227 expr_str: node.expr_str,
1228 type_str: node.type_str,
1229 expected_type_str: node.expected_type_str,
1230 }
1231}
1232
1233fn tactic_info_wire(node: TacticInfoNode) -> LeanWorkerTacticInfo {
1234 LeanWorkerTacticInfo {
1235 start_line: node.start_line,
1236 start_column: node.start_column,
1237 end_line: node.end_line,
1238 end_column: node.end_column,
1239 goals_before: node.goals_before,
1240 goals_after: node.goals_after,
1241 }
1242}
1243
1244fn name_ref_wire(node: NameRefNode) -> LeanWorkerNameRef {
1245 LeanWorkerNameRef {
1246 start_line: node.start_line,
1247 start_column: node.start_column,
1248 end_line: node.end_line,
1249 end_column: node.end_column,
1250 name: node.name,
1251 is_binder: node.is_binder,
1252 }
1253}
1254
1255fn processed_file_wire(file: ProcessedFile) -> LeanWorkerProcessedFile {
1256 LeanWorkerProcessedFile {
1257 commands: file.commands.into_iter().map(command_info_wire).collect(),
1258 terms: file.terms.into_iter().map(term_info_wire).collect(),
1259 tactics: file.tactics.into_iter().map(tactic_info_wire).collect(),
1260 names: file.names.into_iter().map(name_ref_wire).collect(),
1261 diagnostics: elab_failure_wire(&file.diagnostics),
1262 }
1263}
1264
1265fn elab_failure_outcome(failure: &LeanElabFailure) -> LeanWorkerElabResult {
1266 LeanWorkerElabResult {
1267 success: false,
1268 diagnostics: diagnostics(failure),
1269 truncated: failure.truncated(),
1270 }
1271}
1272
1273fn kernel_failure_outcome(status: LeanWorkerKernelStatus, failure: &LeanElabFailure) -> LeanWorkerKernelResult {
1274 LeanWorkerKernelResult {
1275 status,
1276 diagnostics: diagnostics(failure),
1277 truncated: failure.truncated(),
1278 summary: None,
1279 }
1280}
1281
1282fn diagnostics(failure: &LeanElabFailure) -> Vec<LeanWorkerDiagnostic> {
1283 failure
1284 .diagnostics()
1285 .iter()
1286 .map(|diagnostic| {
1287 let (line, column, end_line, end_column) =
1288 diagnostic.position().map_or((None, None, None, None), |position| {
1289 (
1290 Some(position.line()),
1291 Some(position.column()),
1292 position.end_line(),
1293 position.end_column(),
1294 )
1295 });
1296 LeanWorkerDiagnostic {
1297 severity: match diagnostic.severity() {
1298 LeanSeverity::Info => "info",
1299 LeanSeverity::Warning => "warning",
1300 LeanSeverity::Error => "error",
1301 }
1302 .to_owned(),
1303 message: diagnostic.message().to_owned(),
1304 file_label: diagnostic.file_label().to_owned(),
1305 line,
1306 column,
1307 end_line,
1308 end_column,
1309 }
1310 })
1311 .collect()
1312}
1313
1314fn emit_progress(writer: &ProtocolWriter, phase: &str, current: u64, total: Option<u64>) {
1315 drop(writer.write(Message::ProgressTick(ProgressTick {
1316 phase: phase.to_owned(),
1317 current,
1318 total,
1319 })));
1320}
1321
1322fn emit_test_rows(writer: &ProtocolWriter, streams: &[String]) -> Result<u64, crate::protocol::ProtocolError> {
1323 let mut emitter = DataRowEmitter::default();
1324 for (idx, stream) in streams.iter().enumerate() {
1325 let payload = serde_json::value::to_raw_value(&serde_json::json!({
1326 "stream": stream,
1327 "index": idx,
1328 }))?;
1329 let row = emitter.next(stream.clone(), payload);
1330 writer.write(Message::DataRow(row))?;
1331 }
1332 Ok(emitter.count())
1333}
1334
1335#[allow(dead_code, reason = "reserved for future worker configuration paths")]
1336fn _path_for_diagnostics(path: &Path) -> PathBuf {
1337 path.to_path_buf()
1338}