Skip to main content

lean_rs_worker/
child.rs

1use std::path::{Path, PathBuf};
2use std::process::ExitCode;
3use std::sync::{Arc, Mutex};
4use std::time::Instant;
5
6use lean_rs::error::host_internal;
7use lean_rs::module::LeanIo;
8use lean_rs::{
9    LeanCallbackFlow, LeanCallbackHandle, LeanCallbackStatus, LeanError, LeanResult, LeanRuntime, LeanStringEvent,
10};
11use lean_rs_host::host::process::{
12    CommandInfoNode, NameRefNode, ProcessFileOutcome, ProcessModuleOutcome, ProcessedFile, TacticInfoNode, TermInfoNode,
13};
14use lean_rs_host::meta::{self, LeanMetaOptions, LeanMetaResponse, LeanMetaTransparency};
15use lean_rs_host::{
16    LeanCapabilities, LeanDeclarationFilter, LeanElabFailure, LeanElabOptions, LeanHost, LeanKernelOutcome,
17    LeanSession, LeanSeverity, LeanSourceRange,
18};
19use serde::Deserialize;
20use serde_json::value::RawValue;
21
22use crate::protocol::{
23    DataRowEmitter, Diagnostic, Message, ProgressTick, ProtocolError, Request, Response, StreamSummary, read_frame,
24    write_frame,
25};
26use crate::types::{
27    LeanWorkerCapabilityMetadata, LeanWorkerCommandInfo, LeanWorkerDeclarationFilter, LeanWorkerDeclarationRow,
28    LeanWorkerDiagnostic, LeanWorkerDoctorReport, LeanWorkerElabFailure, LeanWorkerElabOptions, LeanWorkerElabResult,
29    LeanWorkerKernelResult, LeanWorkerKernelStatus, LeanWorkerKernelSummary, LeanWorkerMetaResult,
30    LeanWorkerMetaTransparency, LeanWorkerNameRef, LeanWorkerProcessFileOutcome, LeanWorkerProcessModuleOutcome,
31    LeanWorkerProcessedFile, LeanWorkerRendered, LeanWorkerRendering, LeanWorkerSourceRange, LeanWorkerTacticInfo,
32    LeanWorkerTermInfo,
33};
34
35#[derive(Clone)]
36struct ProtocolWriter {
37    stdout: Arc<Mutex<std::io::Stdout>>,
38}
39
40impl ProtocolWriter {
41    fn new() -> Self {
42        Self {
43            stdout: Arc::new(Mutex::new(std::io::stdout())),
44        }
45    }
46
47    fn write(&self, message: Message) -> Result<(), ProtocolError> {
48        let mut stdout = self
49            .stdout
50            .lock()
51            .map_err(|_| ProtocolError::Io(std::io::Error::other("worker stdout mutex was poisoned")))?;
52        write_frame(&mut *stdout, message)
53    }
54}
55
56pub(crate) fn run_stdio() -> ExitCode {
57    install_immediate_abort_exit();
58    match serve_stdio() {
59        Ok(()) => ExitCode::SUCCESS,
60        Err(err) => {
61            eprintln!("lean-rs-worker-child: {err}");
62            ExitCode::FAILURE
63        }
64    }
65}
66
67/// Convert any `SIGABRT` the worker child receives into an immediate
68/// `_exit(134)`, bypassing kernel core-dump machinery and any libc/runtime
69/// residual cleanup on `abort()`.
70///
71/// Lean internal panics with `LEAN_ABORT_ON_PANIC=1` (the worker child's
72/// default) terminate the child via `abort()` → `SIGABRT`. The kernel's
73/// default action for `SIGABRT` is *terminate with core dump*, and on
74/// GitHub Actions `ubuntu-latest` the inherited `core_pattern` pipes the
75/// image to `apport` (or `systemd-coredump`). The kernel holds the dying
76/// child's file descriptors open while the handler drains the pipe; for a
77/// child that has loaded `libleanshared.so` plus a capability dylib chain,
78/// the observed delay is tens of seconds, long enough that the parent
79/// supervisor's per-request timeout fires before it can see EOF on the
80/// child's stdout and translate it to `LeanWorkerError::ChildPanicOrAbort`.
81///
82/// `setrlimit(RLIMIT_CORE, 0)` and `prctl(PR_SET_DUMPABLE, 0)` are
83/// independently advertised as "no core dump" knobs; in practice on the
84/// `ubuntu-latest` runner they reduce the delay substantially but do not
85/// eliminate it (observed: ~107 s without either, ~23 s with `setrlimit`
86/// alone — still above the supervisor's 30 s budget). The decisive fix is
87/// to take over `SIGABRT` ourselves: a `sigaction` handler that calls
88/// `_exit(134)` short-circuits the entire kernel signal-default path,
89/// closes the pipes immediately, and lets the parent observe the fatal
90/// exit on normal IPC timescales.
91///
92/// The diagnostic the parent surfaces to callers does not include a core
93/// file in any supported configuration: typed errors (`ChildPanicOrAbort`,
94/// `Worker { code, message }`) and the captured child stderr cover the
95/// supported failure surface. Worker children therefore have no use for
96/// core dumps, and suppressing them is the right boundary policy.
97///
98/// We also keep the `RLIMIT_CORE` and `PR_SET_DUMPABLE` knobs as a
99/// defence-in-depth: if anything later in the child's lifetime overwrites
100/// the `SIGABRT` handler (e.g. a future Lean runtime that installs its own
101/// signal handler during init), the kernel default action then runs but
102/// the core-dump step is still skipped, preserving the post-`setrlimit`
103/// timing rather than regressing to the unfixed ~107 s.
104#[cfg(unix)]
105#[allow(
106    unsafe_code,
107    reason = "installing a signal handler and calling setrlimit/prctl require libc FFI"
108)]
109fn install_immediate_abort_exit() {
110    extern "C" fn on_sigabrt(_sig: libc::c_int) {
111        // SAFETY: `write` and `_exit` are async-signal-safe per POSIX.
112        // The marker lets test stderr distinguish this exit path from a
113        // raw kernel-default `SIGABRT` termination.
114        const MARKER: &[u8] = b"lean-rs-worker child: SIGABRT, exiting immediately\n";
115        unsafe {
116            let _ = libc::write(libc::STDERR_FILENO, MARKER.as_ptr().cast(), MARKER.len());
117            libc::_exit(134);
118        }
119    }
120
121    // SAFETY: zero-initialised `sigaction` is valid; we then populate the
122    // handler and flags fields. The call modifies process-global state only
123    // and has no aliasing or lifetime concerns. The handler itself uses
124    // only async-signal-safe calls.
125    unsafe {
126        let mut action: libc::sigaction = std::mem::zeroed();
127        action.sa_sigaction = on_sigabrt as *const () as libc::sighandler_t;
128        libc::sigemptyset(&raw mut action.sa_mask);
129        action.sa_flags = libc::SA_RESETHAND;
130        let _ = libc::sigaction(libc::SIGABRT, &raw const action, std::ptr::null_mut());
131    }
132
133    // SAFETY: defence-in-depth. `setrlimit` and `prctl` modify
134    // process-global state only and have no aliasing or lifetime concerns.
135    // Return values are deliberately ignored: the worst case is that the
136    // OS does not honour the request and we fall back on the `sigaction`
137    // handler installed above.
138    unsafe {
139        let limit = libc::rlimit {
140            rlim_cur: 0,
141            rlim_max: 0,
142        };
143        let _ = libc::setrlimit(libc::RLIMIT_CORE, &raw const limit);
144        #[cfg(target_os = "linux")]
145        {
146            let zero: libc::c_ulong = 0;
147            let _ = libc::prctl(libc::PR_SET_DUMPABLE, zero, zero, zero, zero);
148        }
149    }
150}
151
152#[cfg(not(unix))]
153fn install_immediate_abort_exit() {}
154
155#[allow(
156    clippy::significant_drop_tightening,
157    reason = "the child owns stdin/stdout for the full protocol loop"
158)]
159fn serve_stdio() -> Result<(), Box<dyn std::error::Error>> {
160    let runtime = LeanRuntime::init()?;
161    let stdin = std::io::stdin();
162    let mut reader = stdin.lock();
163    let writer = ProtocolWriter::new();
164    let mut host_session: Option<HostSessionState> = None;
165
166    writer.write(Message::Handshake {
167        worker_version: env!("CARGO_PKG_VERSION").to_owned(),
168        protocol_version: crate::protocol::PROTOCOL_VERSION,
169    })?;
170
171    loop {
172        let frame = read_frame(&mut reader)?;
173        let Message::Request(request) = frame.message else {
174            writer.write(Message::Response(Response::Error {
175                code: "lean_rs.worker.protocol.unexpected_frame".to_owned(),
176                message: "child expected request frame".to_owned(),
177            }))?;
178            continue;
179        };
180
181        match request {
182            Request::Health => {
183                writer.write(Message::Response(Response::HealthOk))?;
184            }
185            Request::LoadFixtureCapability { fixture_root } => {
186                let response = match load_fixture_capability(runtime, Path::new(&fixture_root)) {
187                    Ok(()) => Response::CapabilityLoaded,
188                    Err(err) => error_response(&err),
189                };
190                writer.write(Message::Response(response))?;
191            }
192            Request::CallFixtureMul { fixture_root, lhs, rhs } => {
193                let response = match call_fixture_mul(runtime, Path::new(&fixture_root), lhs, rhs) {
194                    Ok(value) => Response::U64 { value },
195                    Err(err) => error_response(&err),
196                };
197                writer.write(Message::Response(response))?;
198            }
199            Request::TriggerLeanPanic { fixture_root } => {
200                let response = match trigger_lean_panic(runtime, Path::new(&fixture_root)) {
201                    Ok(()) => Response::Error {
202                        code: "lean_rs.worker.panic_fixture_returned".to_owned(),
203                        message: "Lean panic fixture returned instead of terminating the child".to_owned(),
204                    },
205                    Err(err) => error_response(&err),
206                };
207                writer.write(Message::Response(response))?;
208            }
209            Request::OpenHostSession {
210                project_root,
211                package,
212                lib_name,
213                imports,
214            } => {
215                let response = match HostSessionState::open(runtime, &project_root, &package, &lib_name, &imports) {
216                    Ok(state) => {
217                        host_session = Some(state);
218                        Response::HostSessionOpened
219                    }
220                    Err(err) => error_response(&err),
221                };
222                writer.write(Message::Response(response))?;
223            }
224            Request::Elaborate { source, options } => {
225                let response = match host_session.as_mut() {
226                    Some(state) => match state.elaborate(&source, &options) {
227                        Ok(outcome) => Response::Elaboration { outcome },
228                        Err(err) => error_response(&err),
229                    },
230                    None => missing_session_response(),
231                };
232                writer.write(Message::Response(response))?;
233            }
234            Request::KernelCheck {
235                source,
236                options,
237                progress,
238            } => {
239                let response = match host_session.as_mut() {
240                    Some(state) => match state.kernel_check(&source, &options, progress, &writer) {
241                        Ok(outcome) => Response::KernelCheck { outcome },
242                        Err(err) => error_response(&err),
243                    },
244                    None => missing_session_response(),
245                };
246                writer.write(Message::Response(response))?;
247            }
248            Request::DeclarationKinds { names, progress } => {
249                let response = match host_session.as_mut() {
250                    Some(state) => match state.declaration_kinds(&names, progress, &writer) {
251                        Ok(values) => Response::Strings { values },
252                        Err(err) => error_response(&err),
253                    },
254                    None => missing_session_response(),
255                };
256                writer.write(Message::Response(response))?;
257            }
258            Request::DeclarationNames { names, progress } => {
259                let response = match host_session.as_mut() {
260                    Some(state) => match state.declaration_names(&names, progress, &writer) {
261                        Ok(values) => Response::Strings { values },
262                        Err(err) => error_response(&err),
263                    },
264                    None => missing_session_response(),
265                };
266                writer.write(Message::Response(response))?;
267            }
268            Request::RunDataStream {
269                export,
270                request_json,
271                progress,
272            } => {
273                let response = match host_session.as_mut() {
274                    Some(state) => match state.run_data_stream(&export, &request_json, progress, &writer) {
275                        Ok(summary) => Response::StreamComplete { summary },
276                        Err(StreamRunError::Host(err)) => error_response(&err),
277                        Err(StreamRunError::ExportStatus(status)) => {
278                            Response::StreamExportFailed { status_byte: status }
279                        }
280                        Err(StreamRunError::CallbackStatus(status)) => Response::StreamCallbackFailed {
281                            status_byte: status.as_abi(),
282                            description: status.description().to_owned(),
283                        },
284                        Err(StreamRunError::MalformedRow(message)) => Response::StreamRowMalformed { message },
285                    },
286                    None => missing_session_response(),
287                };
288                writer.write(Message::Response(response))?;
289            }
290            Request::CapabilityMetadata { export, request_json } => {
291                let response = match host_session.as_mut() {
292                    Some(state) => match state.capability_metadata(&export, &request_json) {
293                        Ok(metadata) => Response::CapabilityMetadata { metadata },
294                        Err(CapabilityJsonError::Host(err)) => error_response(&err),
295                        Err(CapabilityJsonError::Malformed(message)) => {
296                            Response::CapabilityMetadataMalformed { message }
297                        }
298                    },
299                    None => missing_session_response(),
300                };
301                writer.write(Message::Response(response))?;
302            }
303            Request::CapabilityDoctor { export, request_json } => {
304                let response = match host_session.as_mut() {
305                    Some(state) => match state.capability_doctor(&export, &request_json) {
306                        Ok(report) => Response::CapabilityDoctor { report },
307                        Err(CapabilityJsonError::Host(err)) => error_response(&err),
308                        Err(CapabilityJsonError::Malformed(message)) => Response::CapabilityDoctorMalformed { message },
309                    },
310                    None => missing_session_response(),
311                };
312                writer.write(Message::Response(response))?;
313            }
314            Request::JsonCommand { export, request_json } => {
315                let response = match host_session.as_mut() {
316                    Some(state) => match state.json_command(&export, &request_json) {
317                        Ok(response_json) => Response::JsonCommand { response_json },
318                        Err(err) => error_response(&err),
319                    },
320                    None => missing_session_response(),
321                };
322                writer.write(Message::Response(response))?;
323            }
324            Request::InferType { source, options } => {
325                let response = match host_session.as_mut() {
326                    Some(state) => match state.infer_type(&source, &options) {
327                        Ok(result) => Response::MetaExpr { result },
328                        Err(err) => error_response(&err),
329                    },
330                    None => missing_session_response(),
331                };
332                writer.write(Message::Response(response))?;
333            }
334            Request::Whnf { source, options } => {
335                let response = match host_session.as_mut() {
336                    Some(state) => match state.whnf(&source, &options) {
337                        Ok(result) => Response::MetaExpr { result },
338                        Err(err) => error_response(&err),
339                    },
340                    None => missing_session_response(),
341                };
342                writer.write(Message::Response(response))?;
343            }
344            Request::IsDefEq {
345                lhs,
346                rhs,
347                transparency,
348                options,
349            } => {
350                let response = match host_session.as_mut() {
351                    Some(state) => match state.is_def_eq(&lhs, &rhs, transparency, &options) {
352                        Ok(result) => Response::MetaBool { result },
353                        Err(err) => error_response(&err),
354                    },
355                    None => missing_session_response(),
356                };
357                writer.write(Message::Response(response))?;
358            }
359            Request::Describe { name } => {
360                let response = match host_session.as_mut() {
361                    Some(state) => match state.describe(&name) {
362                        Ok(row) => Response::Declaration { row },
363                        Err(err) => error_response(&err),
364                    },
365                    None => missing_session_response(),
366                };
367                writer.write(Message::Response(response))?;
368            }
369            Request::ListDeclarationsStrings { filter, progress } => {
370                let response = match host_session.as_mut() {
371                    Some(state) => match state.list_declarations_strings(filter, progress, &writer) {
372                        Ok(count) => Response::RowsComplete { count },
373                        Err(err) => error_response(&err),
374                    },
375                    None => missing_session_response(),
376                };
377                writer.write(Message::Response(response))?;
378            }
379            Request::DescribeBulk { names, progress } => {
380                let response = match host_session.as_mut() {
381                    Some(state) => match state.describe_bulk(&names, progress, &writer) {
382                        Ok(rows) => Response::DeclarationBulk { rows },
383                        Err(err) => error_response(&err),
384                    },
385                    None => missing_session_response(),
386                };
387                writer.write(Message::Response(response))?;
388            }
389            Request::ProcessFile { source, options } => {
390                let response = match host_session.as_mut() {
391                    Some(state) => match state.process_file(&source, &options) {
392                        Ok(outcome) => Response::ProcessFile { outcome },
393                        Err(err) => error_response(&err),
394                    },
395                    None => missing_session_response(),
396                };
397                writer.write(Message::Response(response))?;
398            }
399            Request::ProcessModule { source, options } => {
400                let response = match host_session.as_mut() {
401                    Some(state) => match state.process_module(&source, &options) {
402                        Ok(outcome) => Response::ProcessModule { outcome },
403                        Err(err) => error_response(&err),
404                    },
405                    None => missing_session_response(),
406                };
407                writer.write(Message::Response(response))?;
408            }
409            Request::EmitTestRows { streams } => {
410                let count = emit_test_rows(&writer, &streams)?;
411                writer.write(Message::Response(Response::RowsComplete { count }))?;
412            }
413            Request::EmitTestRowsThenExit => {
414                let _count = emit_test_rows(&writer, &["rows".to_owned()])?;
415                return Ok(());
416            }
417            Request::EmitTestRowsThenPanic => {
418                let _count = emit_test_rows(&writer, &["rows".to_owned()])?;
419                std::process::abort();
420            }
421            Request::Terminate => {
422                writer.write(Message::Response(Response::Terminating))?;
423                return Ok(());
424            }
425        }
426    }
427}
428
429fn load_fixture_capability(runtime: &'static LeanRuntime, fixture_root: &Path) -> LeanResult<()> {
430    let host = LeanHost::from_lake_project(runtime, fixture_root)?;
431    let _caps = host.load_capabilities("lean_rs_fixture", "LeanRsFixture")?;
432    Ok(())
433}
434
435fn call_fixture_mul(runtime: &'static LeanRuntime, fixture_root: &Path, lhs: u64, rhs: u64) -> LeanResult<u64> {
436    let host = LeanHost::from_lake_project(runtime, fixture_root)?;
437    let caps = host.load_capabilities("lean_rs_fixture", "LeanRsFixture")?;
438    let mut session = caps.session(&["LeanRsFixture.Scalars"], None, None)?;
439    session.call_capability::<(u64, u64), u64>("lean_rs_fixture_u64_mul", (lhs, rhs), None)
440}
441
442fn trigger_lean_panic(runtime: &'static LeanRuntime, fixture_root: &Path) -> LeanResult<()> {
443    let host = LeanHost::from_lake_project(runtime, fixture_root)?;
444    let caps = host.load_capabilities("lean_rs_fixture", "LeanRsFixture")?;
445    let mut session = caps.session(&["LeanRsFixture.Effects"], None, None)?;
446    session.call_capability::<(u8,), ()>("lean_rs_fixture_panic_unit", (0,), None)
447}
448
449fn error_response(err: &LeanError) -> Response {
450    Response::Error {
451        code: err.code().as_str().to_owned(),
452        message: err.to_string(),
453    }
454}
455
456fn missing_session_response() -> Response {
457    Response::Error {
458        code: "lean_rs.worker.session_missing".to_owned(),
459        message: "open a LeanWorkerSession before sending host-session requests".to_owned(),
460    }
461}
462
463struct HostSessionState {
464    #[allow(dead_code, reason = "leaked host anchors the capability and session lifetimes")]
465    host: &'static LeanHost<'static>,
466    #[allow(dead_code, reason = "leaked capabilities anchor the session borrow")]
467    capabilities: &'static LeanCapabilities<'static, 'static>,
468    session: LeanSession<'static, 'static>,
469}
470
471impl HostSessionState {
472    fn open(
473        runtime: &'static LeanRuntime,
474        project_root: &str,
475        package: &str,
476        lib_name: &str,
477        imports: &[String],
478    ) -> LeanResult<Self> {
479        let host = Box::leak(Box::new(LeanHost::from_lake_project(runtime, Path::new(project_root))?));
480        let capabilities = Box::leak(Box::new(host.load_capabilities(package, lib_name)?));
481        let import_refs: Vec<&str> = imports.iter().map(String::as_str).collect();
482        let session = capabilities.session(&import_refs, None, None)?;
483        Ok(Self {
484            host,
485            capabilities,
486            session,
487        })
488    }
489
490    fn elaborate(&mut self, source: &str, options: &LeanWorkerElabOptions) -> LeanResult<LeanWorkerElabResult> {
491        let options = options.to_host_options();
492        let outcome = self.session.elaborate(source, None, &options, None)?;
493        Ok(match outcome {
494            Ok(_expr) => LeanWorkerElabResult {
495                success: true,
496                diagnostics: Vec::new(),
497                truncated: false,
498            },
499            Err(failure) => elab_failure_outcome(&failure),
500        })
501    }
502
503    fn kernel_check(
504        &mut self,
505        source: &str,
506        options: &LeanWorkerElabOptions,
507        progress: bool,
508        writer: &ProtocolWriter,
509    ) -> LeanResult<LeanWorkerKernelResult> {
510        if progress {
511            emit_progress(writer, "kernel_check", 0, Some(1));
512        }
513        let options = options.to_host_options();
514        let outcome = self.session.kernel_check(source, &options, None, None)?;
515        if progress {
516            emit_progress(writer, "kernel_check", 1, Some(1));
517        }
518        Ok(match outcome {
519            LeanKernelOutcome::Checked(evidence) => {
520                let summary = self.session.summarize_evidence(&evidence, None)?;
521                LeanWorkerKernelResult {
522                    status: LeanWorkerKernelStatus::Checked,
523                    diagnostics: Vec::new(),
524                    truncated: false,
525                    summary: Some(LeanWorkerKernelSummary {
526                        declaration_name: summary.declaration_name().to_owned(),
527                        kind: summary.kind().to_owned(),
528                        type_signature: summary.type_signature().to_owned(),
529                    }),
530                }
531            }
532            LeanKernelOutcome::Rejected(failure) => kernel_failure_outcome(LeanWorkerKernelStatus::Rejected, &failure),
533            LeanKernelOutcome::Unavailable(failure) => {
534                kernel_failure_outcome(LeanWorkerKernelStatus::Unavailable, &failure)
535            }
536            LeanKernelOutcome::Unsupported(failure) => {
537                kernel_failure_outcome(LeanWorkerKernelStatus::Unsupported, &failure)
538            }
539        })
540    }
541
542    fn declaration_kinds(
543        &mut self,
544        names: &[String],
545        progress: bool,
546        writer: &ProtocolWriter,
547    ) -> LeanResult<Vec<String>> {
548        if progress {
549            let total = Some(u64::try_from(names.len()).unwrap_or(u64::MAX));
550            let mut out = Vec::with_capacity(names.len());
551            for (idx, name) in names.iter().enumerate() {
552                out.push(self.session.declaration_kind(name, None)?);
553                emit_progress(
554                    writer,
555                    "declaration_kind_bulk",
556                    u64::try_from(idx.saturating_add(1)).unwrap_or(u64::MAX),
557                    total,
558                );
559            }
560            Ok(out)
561        } else {
562            let refs: Vec<&str> = names.iter().map(String::as_str).collect();
563            self.session.declaration_kind_bulk(&refs, None, None)
564        }
565    }
566
567    fn declaration_names(
568        &mut self,
569        names: &[String],
570        progress: bool,
571        writer: &ProtocolWriter,
572    ) -> LeanResult<Vec<String>> {
573        if progress {
574            let total = Some(u64::try_from(names.len()).unwrap_or(u64::MAX));
575            let mut out = Vec::with_capacity(names.len());
576            for (idx, name) in names.iter().enumerate() {
577                out.push(self.session.declaration_name(name, None)?);
578                emit_progress(
579                    writer,
580                    "declaration_name_bulk",
581                    u64::try_from(idx.saturating_add(1)).unwrap_or(u64::MAX),
582                    total,
583                );
584            }
585            Ok(out)
586        } else {
587            let refs: Vec<&str> = names.iter().map(String::as_str).collect();
588            self.session.declaration_name_bulk(&refs, None, None)
589        }
590    }
591
592    fn run_data_stream(
593        &mut self,
594        export: &str,
595        request_json: &str,
596        progress: bool,
597        writer: &ProtocolWriter,
598    ) -> Result<StreamSummary, StreamRunError> {
599        if progress {
600            emit_progress(writer, "data_stream", 0, None);
601        }
602
603        let started = Instant::now();
604        let forwarder = Arc::new(Mutex::new(StreamForwarder::new(writer.clone(), progress)));
605        let row_error = Arc::new(Mutex::new(None::<StreamCallbackError>));
606        let callback_forwarder = Arc::clone(&forwarder);
607        let callback_error = Arc::clone(&row_error);
608        let callback = LeanCallbackHandle::<LeanStringEvent>::register(move |event| {
609            if callback_error.lock().map_or(true, |guard| guard.is_some()) {
610                return LeanCallbackFlow::Stop;
611            }
612            match parse_row_envelope(&event.value) {
613                Ok(StreamCallbackEvent::Row(row)) => match callback_forwarder.lock() {
614                    Ok(mut guard) => match guard.emit_row(row) {
615                        Ok(()) => LeanCallbackFlow::Continue,
616                        Err(err) => {
617                            if let Ok(mut guard) = callback_error.lock() {
618                                *guard = Some(StreamCallbackError::Write(err.to_string()));
619                            }
620                            LeanCallbackFlow::Stop
621                        }
622                    },
623                    Err(_) => {
624                        if let Ok(mut guard) = callback_error.lock() {
625                            *guard = Some(StreamCallbackError::Malformed(
626                                "stream forwarder mutex was poisoned".to_owned(),
627                            ));
628                        }
629                        LeanCallbackFlow::Stop
630                    }
631                },
632                Ok(StreamCallbackEvent::Diagnostic(diagnostic)) => match callback_forwarder.lock() {
633                    Ok(guard) => match guard.emit_diagnostic(diagnostic) {
634                        Ok(()) => LeanCallbackFlow::Continue,
635                        Err(err) => {
636                            if let Ok(mut guard) = callback_error.lock() {
637                                *guard = Some(StreamCallbackError::Write(err.to_string()));
638                            }
639                            LeanCallbackFlow::Stop
640                        }
641                    },
642                    Err(_) => {
643                        if let Ok(mut guard) = callback_error.lock() {
644                            *guard = Some(StreamCallbackError::Malformed(
645                                "stream forwarder mutex was poisoned".to_owned(),
646                            ));
647                        }
648                        LeanCallbackFlow::Stop
649                    }
650                },
651                Ok(StreamCallbackEvent::Progress(progress)) => match callback_forwarder.lock() {
652                    Ok(guard) => match guard.emit_progress(progress) {
653                        Ok(()) => LeanCallbackFlow::Continue,
654                        Err(err) => {
655                            if let Ok(mut guard) = callback_error.lock() {
656                                *guard = Some(StreamCallbackError::Write(err.to_string()));
657                            }
658                            LeanCallbackFlow::Stop
659                        }
660                    },
661                    Err(_) => {
662                        if let Ok(mut guard) = callback_error.lock() {
663                            *guard = Some(StreamCallbackError::Malformed(
664                                "stream forwarder mutex was poisoned".to_owned(),
665                            ));
666                        }
667                        LeanCallbackFlow::Stop
668                    }
669                },
670                Ok(StreamCallbackEvent::Metadata(metadata)) => match callback_forwarder.lock() {
671                    Ok(mut guard) => {
672                        guard.set_metadata(metadata);
673                        LeanCallbackFlow::Continue
674                    }
675                    Err(_) => {
676                        if let Ok(mut guard) = callback_error.lock() {
677                            *guard = Some(StreamCallbackError::Malformed(
678                                "stream forwarder mutex was poisoned".to_owned(),
679                            ));
680                        }
681                        LeanCallbackFlow::Stop
682                    }
683                },
684                Err(message) => {
685                    if let Ok(mut guard) = callback_error.lock() {
686                        *guard = Some(StreamCallbackError::Malformed(message));
687                    }
688                    LeanCallbackFlow::Stop
689                }
690            }
691        })
692        .map_err(StreamRunError::Host)?;
693
694        let (handle, trampoline) = callback.abi_parts();
695        let status = self
696            .session
697            .call_capability::<(&str, usize, usize), LeanIo<u8>>(export, (request_json, handle, trampoline), None)
698            .map_err(StreamRunError::Host)?;
699
700        if let Some(error) = row_error.lock().ok().and_then(|mut guard| guard.take()) {
701            return Err(match error {
702                StreamCallbackError::Malformed(message) => StreamRunError::MalformedRow(message),
703                StreamCallbackError::Write(message) => {
704                    StreamRunError::Host(host_internal(format!("worker stream frame write failed: {message}")))
705                }
706            });
707        }
708
709        match LeanCallbackStatus::from_abi(status) {
710            Some(LeanCallbackStatus::Ok) => {}
711            Some(status) => return Err(StreamRunError::CallbackStatus(status)),
712            None => return Err(StreamRunError::ExportStatus(status)),
713        }
714
715        let guard = forwarder
716            .lock()
717            .map_err(|_| StreamRunError::MalformedRow("stream forwarder mutex was poisoned".to_owned()))?;
718        Ok(guard.summary(started.elapsed()))
719    }
720
721    fn capability_metadata(
722        &mut self,
723        export: &str,
724        request_json: &str,
725    ) -> Result<LeanWorkerCapabilityMetadata, CapabilityJsonError> {
726        let raw = self
727            .session
728            .call_capability::<(&str,), LeanIo<String>>(export, (request_json,), None)
729            .map_err(CapabilityJsonError::Host)?;
730        serde_json::from_str(&raw).map_err(|err| CapabilityJsonError::Malformed(err.to_string()))
731    }
732
733    fn capability_doctor(
734        &mut self,
735        export: &str,
736        request_json: &str,
737    ) -> Result<LeanWorkerDoctorReport, CapabilityJsonError> {
738        let raw = self
739            .session
740            .call_capability::<(&str,), LeanIo<String>>(export, (request_json,), None)
741            .map_err(CapabilityJsonError::Host)?;
742        serde_json::from_str(&raw).map_err(|err| CapabilityJsonError::Malformed(err.to_string()))
743    }
744
745    fn json_command(&mut self, export: &str, request_json: &str) -> LeanResult<String> {
746        self.session
747            .call_capability::<(&str,), LeanIo<String>>(export, (request_json,), None)
748    }
749
750    fn infer_type(
751        &mut self,
752        source: &str,
753        options: &LeanWorkerElabOptions,
754    ) -> LeanResult<LeanWorkerMetaResult<LeanWorkerRendered>> {
755        let elab_options = options.to_host_options();
756        let elab_outcome = self.session.elaborate(source, None, &elab_options, None)?;
757        let expr = match elab_outcome {
758            Ok(expr) => expr,
759            Err(failure) => return Ok(meta_failure_from_elab(&failure)),
760        };
761        let meta_options = options.to_host_meta_options(LeanMetaTransparency::Default);
762        let response = self.session.run_meta(&meta::infer_type(), expr, &meta_options, None)?;
763        meta_render_expr(&mut self.session, response, &meta_options)
764    }
765
766    fn whnf(
767        &mut self,
768        source: &str,
769        options: &LeanWorkerElabOptions,
770    ) -> LeanResult<LeanWorkerMetaResult<LeanWorkerRendered>> {
771        let elab_options = options.to_host_options();
772        let elab_outcome = self.session.elaborate(source, None, &elab_options, None)?;
773        let expr = match elab_outcome {
774            Ok(expr) => expr,
775            Err(failure) => return Ok(meta_failure_from_elab(&failure)),
776        };
777        let meta_options = options.to_host_meta_options(LeanMetaTransparency::Default);
778        let response = self.session.run_meta(&meta::whnf(), expr, &meta_options, None)?;
779        meta_render_expr(&mut self.session, response, &meta_options)
780    }
781
782    fn is_def_eq(
783        &mut self,
784        lhs: &str,
785        rhs: &str,
786        transparency: LeanWorkerMetaTransparency,
787        options: &LeanWorkerElabOptions,
788    ) -> LeanResult<LeanWorkerMetaResult<bool>> {
789        let elab_options = options.to_host_options();
790        let lhs_outcome = self.session.elaborate(lhs, None, &elab_options, None)?;
791        let lhs_expr = match lhs_outcome {
792            Ok(expr) => expr,
793            Err(failure) => return Ok(meta_failure_from_elab(&failure)),
794        };
795        let rhs_outcome = self.session.elaborate(rhs, None, &elab_options, None)?;
796        let rhs_expr = match rhs_outcome {
797            Ok(expr) => expr,
798            Err(failure) => return Ok(meta_failure_from_elab(&failure)),
799        };
800        let transparency_host = transparency.into();
801        let meta_options = options.to_host_meta_options(transparency_host);
802        let response = self.session.run_meta(
803            &meta::is_def_eq(),
804            (lhs_expr, rhs_expr, transparency_host),
805            &meta_options,
806            None,
807        )?;
808        match response {
809            LeanMetaResponse::Ok(value) => Ok(LeanWorkerMetaResult::Ok { value }),
810            LeanMetaResponse::Failed(failure) => Ok(LeanWorkerMetaResult::Failed {
811                failure: elab_failure_wire(&failure),
812            }),
813            LeanMetaResponse::TimeoutOrHeartbeat(failure) => Ok(LeanWorkerMetaResult::TimeoutOrHeartbeat {
814                failure: elab_failure_wire(&failure),
815            }),
816            LeanMetaResponse::Unsupported(failure) => Ok(LeanWorkerMetaResult::Unsupported {
817                failure: elab_failure_wire(&failure),
818            }),
819        }
820    }
821
822    fn describe(&mut self, name: &str) -> LeanResult<Option<LeanWorkerDeclarationRow>> {
823        let kind = self.session.declaration_kind(name, None)?;
824        if kind == "missing" {
825            return Ok(None);
826        }
827        let type_signature = match self.session.declaration_type(name, None)? {
828            Some(expr) => Some(self.session.expr_to_string_raw(&expr, None)?),
829            None => None,
830        };
831        let source = self
832            .session
833            .declaration_source_range(name, None)?
834            .map(source_range_wire);
835        Ok(Some(LeanWorkerDeclarationRow {
836            name: name.to_owned(),
837            kind,
838            type_signature,
839            source,
840        }))
841    }
842
843    fn list_declarations_strings(
844        &mut self,
845        filter: LeanWorkerDeclarationFilter,
846        progress: bool,
847        writer: &ProtocolWriter,
848    ) -> LeanResult<u64> {
849        let host_filter = LeanDeclarationFilter {
850            include_private: filter.include_private,
851            include_generated: filter.include_generated,
852            include_internal: filter.include_internal,
853        };
854        if progress {
855            emit_progress(writer, "list_declarations_strings", 0, None);
856        }
857        let names = self.session.list_declarations_strings(&host_filter, None, None)?;
858        let total = u64::try_from(names.len()).unwrap_or(u64::MAX);
859        let mut emitter = DataRowEmitter::default();
860        for name in names {
861            let payload = serde_json::value::to_raw_value(&name)
862                .map_err(|err| host_internal(format!("list_declarations_strings row payload encode failed: {err}")))?;
863            let row = emitter.next("rows", payload);
864            writer
865                .write(Message::DataRow(row))
866                .map_err(|err| host_internal(format!("list_declarations_strings row frame write failed: {err}")))?;
867        }
868        if progress {
869            emit_progress(writer, "list_declarations_strings", total, Some(total));
870        }
871        Ok(emitter.count())
872    }
873
874    fn describe_bulk(
875        &mut self,
876        names: &[String],
877        progress: bool,
878        writer: &ProtocolWriter,
879    ) -> LeanResult<Vec<LeanWorkerDeclarationRow>> {
880        let refs: Vec<&str> = names.iter().map(String::as_str).collect();
881        let kinds = self.session.declaration_kind_bulk(&refs, None, None)?;
882        let types = self.session.declaration_type_bulk(&refs, None, None)?;
883        let total = Some(u64::try_from(names.len()).unwrap_or(u64::MAX));
884        let mut rows = Vec::with_capacity(names.len());
885        for (idx, name) in names.iter().enumerate() {
886            let kind = kinds.get(idx).cloned().unwrap_or_else(|| "missing".to_owned());
887            let row = if kind == "missing" {
888                LeanWorkerDeclarationRow {
889                    name: name.clone(),
890                    kind,
891                    type_signature: None,
892                    source: None,
893                }
894            } else {
895                let type_signature = match types.get(idx).and_then(Option::as_ref) {
896                    Some(expr) => Some(self.session.expr_to_string_raw(expr, None)?),
897                    None => None,
898                };
899                let source = self
900                    .session
901                    .declaration_source_range(name, None)?
902                    .map(source_range_wire);
903                LeanWorkerDeclarationRow {
904                    name: name.clone(),
905                    kind,
906                    type_signature,
907                    source,
908                }
909            };
910            rows.push(row);
911            if progress {
912                emit_progress(
913                    writer,
914                    "describe_bulk",
915                    u64::try_from(idx.saturating_add(1)).unwrap_or(u64::MAX),
916                    total,
917                );
918            }
919        }
920        Ok(rows)
921    }
922
923    fn process_file(
924        &mut self,
925        source: &str,
926        options: &LeanWorkerElabOptions,
927    ) -> LeanResult<LeanWorkerProcessFileOutcome> {
928        let options = options.to_host_options();
929        Ok(match self.session.process_with_info_tree(source, &options, None)? {
930            ProcessFileOutcome::Processed(file) => LeanWorkerProcessFileOutcome::Processed {
931                file: processed_file_wire(file),
932            },
933            ProcessFileOutcome::Unsupported => LeanWorkerProcessFileOutcome::Unsupported,
934        })
935    }
936
937    fn process_module(
938        &mut self,
939        source: &str,
940        options: &LeanWorkerElabOptions,
941    ) -> LeanResult<LeanWorkerProcessModuleOutcome> {
942        let options = options.to_host_options();
943        Ok(
944            match self.session.process_module_with_info_tree(source, &options, None)? {
945                ProcessModuleOutcome::Ok { file, imports } => LeanWorkerProcessModuleOutcome::Ok {
946                    file: processed_file_wire(file),
947                    imports,
948                },
949                ProcessModuleOutcome::MissingImports { file, imports, missing } => {
950                    LeanWorkerProcessModuleOutcome::MissingImports {
951                        file: processed_file_wire(file),
952                        imports,
953                        missing,
954                    }
955                }
956                ProcessModuleOutcome::HeaderParseFailed { diagnostics } => {
957                    LeanWorkerProcessModuleOutcome::HeaderParseFailed {
958                        diagnostics: elab_failure_wire(&diagnostics),
959                    }
960                }
961                ProcessModuleOutcome::Unsupported => LeanWorkerProcessModuleOutcome::Unsupported,
962            },
963        )
964    }
965}
966
967#[derive(Clone, Debug)]
968struct PendingDataRow {
969    stream: String,
970    payload: Box<RawValue>,
971}
972
973enum StreamCallbackEvent {
974    Row(PendingDataRow),
975    Diagnostic(Diagnostic),
976    Progress(ProgressTick),
977    Metadata(serde_json::Value),
978}
979
980enum StreamCallbackError {
981    Malformed(String),
982    Write(String),
983}
984
985struct StreamForwarder {
986    writer: ProtocolWriter,
987    emitter: DataRowEmitter,
988    progress: bool,
989    metadata: Option<serde_json::Value>,
990}
991
992impl StreamForwarder {
993    fn new(writer: ProtocolWriter, progress: bool) -> Self {
994        Self {
995            writer,
996            emitter: DataRowEmitter::default(),
997            progress,
998            metadata: None,
999        }
1000    }
1001
1002    fn emit_row(&mut self, row: PendingDataRow) -> Result<(), ProtocolError> {
1003        let row = self.emitter.next(row.stream, row.payload);
1004        self.writer.write(Message::DataRow(row))?;
1005        if self.progress {
1006            emit_progress(&self.writer, "data_stream", self.emitter.count(), None);
1007        }
1008        Ok(())
1009    }
1010
1011    fn emit_diagnostic(&self, diagnostic: Diagnostic) -> Result<(), ProtocolError> {
1012        self.writer.write(Message::Diagnostic(diagnostic))
1013    }
1014
1015    fn emit_progress(&self, progress: ProgressTick) -> Result<(), ProtocolError> {
1016        self.writer.write(Message::ProgressTick(progress))
1017    }
1018
1019    fn set_metadata(&mut self, metadata: serde_json::Value) {
1020        self.metadata = Some(metadata);
1021    }
1022
1023    fn summary(&self, elapsed: std::time::Duration) -> StreamSummary {
1024        StreamSummary::new(
1025            self.emitter.count(),
1026            self.emitter.per_stream_counts(),
1027            elapsed,
1028            self.metadata.clone(),
1029        )
1030    }
1031}
1032
1033#[derive(Debug)]
1034enum StreamRunError {
1035    Host(LeanError),
1036    ExportStatus(u8),
1037    CallbackStatus(LeanCallbackStatus),
1038    MalformedRow(String),
1039}
1040
1041enum CapabilityJsonError {
1042    Host(LeanError),
1043    Malformed(String),
1044}
1045
1046impl From<crate::protocol::ProtocolError> for StreamRunError {
1047    fn from(value: crate::protocol::ProtocolError) -> Self {
1048        Self::Host(host_internal(format!("worker data-row frame write failed: {value}")))
1049    }
1050}
1051
1052fn parse_row_envelope(raw: &str) -> Result<StreamCallbackEvent, String> {
1053    let envelope: RowCallbackEnvelope =
1054        serde_json::from_str(raw).map_err(|err| format!("row callback payload is not valid JSON: {err}"))?;
1055    if let Some(diagnostic) = envelope.diagnostic {
1056        let code = diagnostic
1057            .code
1058            .filter(|value| !value.is_empty())
1059            .ok_or_else(|| "diagnostic callback payload must contain a non-empty string field `code`".to_owned())?;
1060        let message = diagnostic
1061            .message
1062            .ok_or_else(|| "diagnostic callback payload must contain a string field `message`".to_owned())?;
1063        return Ok(StreamCallbackEvent::Diagnostic(Diagnostic { code, message }));
1064    }
1065    if let Some(progress) = envelope.progress {
1066        let phase = progress
1067            .phase
1068            .filter(|value| !value.is_empty())
1069            .ok_or_else(|| "progress callback payload must contain a non-empty string field `phase`".to_owned())?;
1070        return Ok(StreamCallbackEvent::Progress(ProgressTick {
1071            phase,
1072            current: progress.current,
1073            total: progress.total,
1074        }));
1075    }
1076    if let Some(metadata) = envelope.metadata {
1077        let metadata = serde_json::from_str(metadata.get())
1078            .map_err(|err| format!("metadata callback payload is not valid JSON: {err}"))?;
1079        return Ok(StreamCallbackEvent::Metadata(metadata));
1080    }
1081    let stream = envelope
1082        .stream
1083        .filter(|value| !value.is_empty())
1084        .ok_or_else(|| "row callback payload must contain a non-empty string field `stream`".to_owned())?;
1085    let payload = envelope
1086        .payload
1087        .ok_or_else(|| "row callback payload must contain field `payload`".to_owned())?;
1088    Ok(StreamCallbackEvent::Row(PendingDataRow { stream, payload }))
1089}
1090
1091#[derive(Deserialize)]
1092struct RowCallbackEnvelope {
1093    stream: Option<String>,
1094    payload: Option<Box<RawValue>>,
1095    diagnostic: Option<RowCallbackDiagnostic>,
1096    progress: Option<RowCallbackProgress>,
1097    metadata: Option<Box<RawValue>>,
1098}
1099
1100#[derive(Deserialize)]
1101struct RowCallbackDiagnostic {
1102    code: Option<String>,
1103    message: Option<String>,
1104}
1105
1106#[derive(Deserialize)]
1107struct RowCallbackProgress {
1108    phase: Option<String>,
1109    current: u64,
1110    total: Option<u64>,
1111}
1112
1113impl LeanWorkerElabOptions {
1114    fn to_host_options(&self) -> LeanElabOptions {
1115        LeanElabOptions::new()
1116            .namespace_context(&self.namespace_context)
1117            .file_label(&self.file_label)
1118            .heartbeat_limit(self.heartbeat_limit)
1119            .diagnostic_byte_limit(self.diagnostic_byte_limit)
1120    }
1121
1122    fn to_host_meta_options(&self, transparency: LeanMetaTransparency) -> LeanMetaOptions {
1123        LeanMetaOptions::new()
1124            .namespace_context(&self.namespace_context)
1125            .heartbeat_limit(self.heartbeat_limit)
1126            .diagnostic_byte_limit(self.diagnostic_byte_limit)
1127            .transparency(transparency)
1128    }
1129}
1130
1131impl From<LeanWorkerMetaTransparency> for LeanMetaTransparency {
1132    fn from(value: LeanWorkerMetaTransparency) -> Self {
1133        match value {
1134            LeanWorkerMetaTransparency::Default => Self::Default,
1135            LeanWorkerMetaTransparency::Reducible => Self::Reducible,
1136            LeanWorkerMetaTransparency::Instances => Self::Instances,
1137            LeanWorkerMetaTransparency::All => Self::All,
1138        }
1139    }
1140}
1141
1142fn elab_failure_wire(failure: &LeanElabFailure) -> LeanWorkerElabFailure {
1143    LeanWorkerElabFailure {
1144        diagnostics: diagnostics(failure),
1145        truncated: failure.truncated(),
1146    }
1147}
1148
1149fn meta_failure_from_elab<T>(failure: &LeanElabFailure) -> LeanWorkerMetaResult<T> {
1150    LeanWorkerMetaResult::Failed {
1151        failure: elab_failure_wire(failure),
1152    }
1153}
1154
1155fn meta_render_expr(
1156    session: &mut LeanSession<'static, 'static>,
1157    response: LeanMetaResponse<lean_rs::LeanExpr<'static>>,
1158    meta_options: &LeanMetaOptions,
1159) -> LeanResult<LeanWorkerMetaResult<LeanWorkerRendered>> {
1160    let expr = match response {
1161        LeanMetaResponse::Ok(expr) => expr,
1162        LeanMetaResponse::Failed(failure) => {
1163            return Ok(LeanWorkerMetaResult::Failed {
1164                failure: elab_failure_wire(&failure),
1165            });
1166        }
1167        LeanMetaResponse::TimeoutOrHeartbeat(failure) => {
1168            return Ok(LeanWorkerMetaResult::TimeoutOrHeartbeat {
1169                failure: elab_failure_wire(&failure),
1170            });
1171        }
1172        LeanMetaResponse::Unsupported(failure) => {
1173            return Ok(LeanWorkerMetaResult::Unsupported {
1174                failure: elab_failure_wire(&failure),
1175            });
1176        }
1177    };
1178    let pp_response = session.run_meta(&meta::pp_expr(), expr.clone(), meta_options, None)?;
1179    Ok(match pp_response {
1180        LeanMetaResponse::Ok(rendered) => LeanWorkerMetaResult::Ok {
1181            value: LeanWorkerRendered {
1182                value: rendered,
1183                rendering: LeanWorkerRendering::Pretty,
1184            },
1185        },
1186        LeanMetaResponse::Unsupported(_) => LeanWorkerMetaResult::Ok {
1187            value: LeanWorkerRendered {
1188                value: session.expr_to_string_raw(&expr, None)?,
1189                rendering: LeanWorkerRendering::Raw,
1190            },
1191        },
1192        LeanMetaResponse::Failed(failure) => LeanWorkerMetaResult::Failed {
1193            failure: elab_failure_wire(&failure),
1194        },
1195        LeanMetaResponse::TimeoutOrHeartbeat(failure) => LeanWorkerMetaResult::TimeoutOrHeartbeat {
1196            failure: elab_failure_wire(&failure),
1197        },
1198    })
1199}
1200
1201fn source_range_wire(range: LeanSourceRange) -> LeanWorkerSourceRange {
1202    LeanWorkerSourceRange {
1203        file: range.file,
1204        start_line: range.start_line,
1205        start_column: range.start_column,
1206        end_line: range.end_line,
1207        end_column: range.end_column,
1208    }
1209}
1210
1211fn command_info_wire(node: CommandInfoNode) -> LeanWorkerCommandInfo {
1212    LeanWorkerCommandInfo {
1213        start_line: node.start_line,
1214        start_column: node.start_column,
1215        end_line: node.end_line,
1216        end_column: node.end_column,
1217        decl_name: node.decl_name,
1218    }
1219}
1220
1221fn term_info_wire(node: TermInfoNode) -> LeanWorkerTermInfo {
1222    LeanWorkerTermInfo {
1223        start_line: node.start_line,
1224        start_column: node.start_column,
1225        end_line: node.end_line,
1226        end_column: node.end_column,
1227        expr_str: node.expr_str,
1228        type_str: node.type_str,
1229        expected_type_str: node.expected_type_str,
1230    }
1231}
1232
1233fn tactic_info_wire(node: TacticInfoNode) -> LeanWorkerTacticInfo {
1234    LeanWorkerTacticInfo {
1235        start_line: node.start_line,
1236        start_column: node.start_column,
1237        end_line: node.end_line,
1238        end_column: node.end_column,
1239        goals_before: node.goals_before,
1240        goals_after: node.goals_after,
1241    }
1242}
1243
1244fn name_ref_wire(node: NameRefNode) -> LeanWorkerNameRef {
1245    LeanWorkerNameRef {
1246        start_line: node.start_line,
1247        start_column: node.start_column,
1248        end_line: node.end_line,
1249        end_column: node.end_column,
1250        name: node.name,
1251        is_binder: node.is_binder,
1252    }
1253}
1254
1255fn processed_file_wire(file: ProcessedFile) -> LeanWorkerProcessedFile {
1256    LeanWorkerProcessedFile {
1257        commands: file.commands.into_iter().map(command_info_wire).collect(),
1258        terms: file.terms.into_iter().map(term_info_wire).collect(),
1259        tactics: file.tactics.into_iter().map(tactic_info_wire).collect(),
1260        names: file.names.into_iter().map(name_ref_wire).collect(),
1261        diagnostics: elab_failure_wire(&file.diagnostics),
1262    }
1263}
1264
1265fn elab_failure_outcome(failure: &LeanElabFailure) -> LeanWorkerElabResult {
1266    LeanWorkerElabResult {
1267        success: false,
1268        diagnostics: diagnostics(failure),
1269        truncated: failure.truncated(),
1270    }
1271}
1272
1273fn kernel_failure_outcome(status: LeanWorkerKernelStatus, failure: &LeanElabFailure) -> LeanWorkerKernelResult {
1274    LeanWorkerKernelResult {
1275        status,
1276        diagnostics: diagnostics(failure),
1277        truncated: failure.truncated(),
1278        summary: None,
1279    }
1280}
1281
1282fn diagnostics(failure: &LeanElabFailure) -> Vec<LeanWorkerDiagnostic> {
1283    failure
1284        .diagnostics()
1285        .iter()
1286        .map(|diagnostic| {
1287            let (line, column, end_line, end_column) =
1288                diagnostic.position().map_or((None, None, None, None), |position| {
1289                    (
1290                        Some(position.line()),
1291                        Some(position.column()),
1292                        position.end_line(),
1293                        position.end_column(),
1294                    )
1295                });
1296            LeanWorkerDiagnostic {
1297                severity: match diagnostic.severity() {
1298                    LeanSeverity::Info => "info",
1299                    LeanSeverity::Warning => "warning",
1300                    LeanSeverity::Error => "error",
1301                }
1302                .to_owned(),
1303                message: diagnostic.message().to_owned(),
1304                file_label: diagnostic.file_label().to_owned(),
1305                line,
1306                column,
1307                end_line,
1308                end_column,
1309            }
1310        })
1311        .collect()
1312}
1313
1314fn emit_progress(writer: &ProtocolWriter, phase: &str, current: u64, total: Option<u64>) {
1315    drop(writer.write(Message::ProgressTick(ProgressTick {
1316        phase: phase.to_owned(),
1317        current,
1318        total,
1319    })));
1320}
1321
1322fn emit_test_rows(writer: &ProtocolWriter, streams: &[String]) -> Result<u64, crate::protocol::ProtocolError> {
1323    let mut emitter = DataRowEmitter::default();
1324    for (idx, stream) in streams.iter().enumerate() {
1325        let payload = serde_json::value::to_raw_value(&serde_json::json!({
1326            "stream": stream,
1327            "index": idx,
1328        }))?;
1329        let row = emitter.next(stream.clone(), payload);
1330        writer.write(Message::DataRow(row))?;
1331    }
1332    Ok(emitter.count())
1333}
1334
1335#[allow(dead_code, reason = "reserved for future worker configuration paths")]
1336fn _path_for_diagnostics(path: &Path) -> PathBuf {
1337    path.to_path_buf()
1338}