Skip to main content

lean_rs_worker/
session.rs

1//! Worker-host adapter over the process-boundary supervisor.
2//!
3//! This module is intentionally narrower than `lean-rs-host::LeanSession`.
4//! It exposes serializable outcomes that make sense across a child process:
5//! declaration text, elaboration diagnostics, and kernel-check status. Runtime
6//! handles such as `LeanExpr` and `LeanEvidence` stay inside the child.
7
8use std::collections::BTreeMap;
9use std::fmt;
10use std::marker::PhantomData;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::time::{Duration, Instant};
15
16use serde::Serialize;
17use serde::de::DeserializeOwned;
18use serde_json::Value;
19use serde_json::value::RawValue;
20
21use crate::protocol::{DataRow, Diagnostic, StreamSummary};
22use crate::supervisor::{LeanWorker, LeanWorkerError};
23use crate::types::{
24    LeanWorkerCapabilityMetadata, LeanWorkerDeclarationFilter, LeanWorkerDeclarationRow, LeanWorkerDoctorReport,
25    LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerKernelResult, LeanWorkerMetaResult,
26    LeanWorkerMetaTransparency, LeanWorkerProcessFileOutcome, LeanWorkerProcessModuleOutcome, LeanWorkerRendered,
27};
28
29/// Configuration for opening one host session inside a worker child.
30#[derive(Clone, Debug, Eq, PartialEq)]
31pub struct LeanWorkerSessionConfig {
32    project_root: PathBuf,
33    package: String,
34    lib_name: String,
35    imports: Vec<String>,
36}
37
38impl LeanWorkerSessionConfig {
39    /// Create a session configuration for a Lake capability and import list.
40    pub fn new(
41        project_root: impl Into<PathBuf>,
42        package: impl Into<String>,
43        lib_name: impl Into<String>,
44        imports: impl IntoIterator<Item = impl Into<String>>,
45    ) -> Self {
46        Self {
47            project_root: project_root.into(),
48            package: package.into(),
49            lib_name: lib_name.into(),
50            imports: imports.into_iter().map(Into::into).collect(),
51        }
52    }
53
54    pub(crate) fn project_root_string(&self) -> String {
55        self.project_root.to_string_lossy().into_owned()
56    }
57
58    pub(crate) fn package(&self) -> &str {
59        &self.package
60    }
61
62    pub(crate) fn lib_name(&self) -> &str {
63        &self.lib_name
64    }
65
66    pub(crate) fn imports(&self) -> &[String] {
67        &self.imports
68    }
69}
70
71/// Protocol/runtime facts reported by the worker child during handshake.
72///
73/// These facts describe the `lean-rs-worker` process and framing contract.
74/// They are separate from downstream capability metadata returned by
75/// `LeanWorkerSession::capability_metadata`.
76#[derive(Clone, Debug, Eq, PartialEq)]
77pub struct LeanWorkerRuntimeMetadata {
78    pub worker_version: String,
79    pub protocol_version: u16,
80    pub lean_version: Option<String>,
81}
82
83/// Parent-side cancellation token for worker-session requests.
84///
85/// Cancellation is observed by the supervisor before a request is sent and at
86/// worker progress frames while a request is in flight. In-flight cancellation
87/// cycles the child process; it does not share an in-process
88/// `LeanCancellationToken` with the child.
89#[derive(Clone, Debug, Default)]
90pub struct LeanWorkerCancellationToken {
91    cancelled: Arc<AtomicBool>,
92}
93
94impl LeanWorkerCancellationToken {
95    /// Create a non-cancelled token.
96    #[must_use]
97    pub fn new() -> Self {
98        Self::default()
99    }
100
101    /// Request cancellation.
102    pub fn cancel(&self) {
103        self.cancelled.store(true, Ordering::Release);
104    }
105
106    /// Whether cancellation was requested.
107    #[must_use]
108    pub fn is_cancelled(&self) -> bool {
109        self.cancelled.load(Ordering::Acquire)
110    }
111}
112
113/// One progress event observed by the parent from a worker request.
114#[derive(Clone, Debug, Eq, PartialEq)]
115pub struct LeanWorkerProgressEvent {
116    pub phase: String,
117    pub current: u64,
118    pub total: Option<u64>,
119    pub elapsed: Duration,
120}
121
122/// Parent-side sink for worker progress events.
123pub trait LeanWorkerProgressSink: Send + Sync {
124    fn report(&self, event: LeanWorkerProgressEvent);
125}
126
127/// One downstream-owned JSON row delivered over a worker request.
128///
129/// `stream` is a caller-defined channel name. `sequence` starts at zero per
130/// stream inside one request and is assigned by `lean-rs-worker`. `payload` is
131/// owned JSON; callers may keep it after `LeanWorkerDataSink::report` returns.
132#[derive(Clone, Debug, Eq, PartialEq)]
133pub struct LeanWorkerDataRow {
134    pub stream: String,
135    pub sequence: u64,
136    pub payload: Value,
137}
138
139impl TryFrom<DataRow> for LeanWorkerDataRow {
140    type Error = LeanWorkerError;
141
142    fn try_from(value: DataRow) -> Result<Self, Self::Error> {
143        let payload = serde_json::from_str(value.payload.get()).map_err(|err| LeanWorkerError::Protocol {
144            message: format!("worker data-row payload decode failed: {err}"),
145        })?;
146        Ok(Self {
147            stream: value.stream,
148            sequence: value.sequence,
149            payload,
150        })
151    }
152}
153
154/// Parent-side sink for downstream data rows produced by one worker request.
155///
156/// A sink is borrowed for one request. It receives owned rows and may store
157/// them. If `report` panics, the supervisor catches the panic and returns
158/// `LeanWorkerError::DataSinkPanic`.
159pub trait LeanWorkerDataSink: Send + Sync {
160    fn report(&self, row: LeanWorkerDataRow);
161}
162
163pub(crate) struct LeanWorkerRawDataRow {
164    pub(crate) stream: String,
165    pub(crate) sequence: u64,
166    pub(crate) payload: Box<RawValue>,
167}
168
169impl From<DataRow> for LeanWorkerRawDataRow {
170    fn from(value: DataRow) -> Self {
171        Self {
172            stream: value.stream,
173            sequence: value.sequence,
174            payload: value.payload,
175        }
176    }
177}
178
179pub(crate) trait LeanWorkerRawDataSink: Send + Sync {
180    fn report(&self, row: LeanWorkerRawDataRow);
181}
182
183#[derive(Clone, Copy)]
184pub(crate) enum LeanWorkerDataSinkTarget<'a> {
185    Value(&'a dyn LeanWorkerDataSink),
186    Raw(&'a dyn LeanWorkerRawDataSink),
187}
188
189/// One diagnostic message delivered over a worker request.
190///
191/// Diagnostics are control/observability messages, not data rows. They are
192/// delivered through `LeanWorkerDiagnosticSink` so row payloads remain
193/// downstream-owned data.
194#[derive(Clone, Debug, Eq, PartialEq)]
195pub struct LeanWorkerDiagnosticEvent {
196    pub code: String,
197    pub message: String,
198}
199
200impl From<Diagnostic> for LeanWorkerDiagnosticEvent {
201    fn from(value: Diagnostic) -> Self {
202        Self {
203            code: value.code,
204            message: value.message,
205        }
206    }
207}
208
209/// Parent-side sink for diagnostics produced by one worker request.
210pub trait LeanWorkerDiagnosticSink: Send + Sync {
211    fn report(&self, diagnostic: LeanWorkerDiagnosticEvent);
212}
213
214/// Summary returned after a worker data-stream export completes.
215///
216/// Rows delivered to `LeanWorkerDataSink` are tentative until this summary is
217/// returned successfully. Downstream callers that need atomic commit should
218/// buffer rows in their sink and commit only after terminal success.
219#[derive(Clone, Debug, Eq, PartialEq)]
220pub struct LeanWorkerStreamSummary {
221    /// Total number of rows delivered to the parent before terminal success.
222    pub total_rows: u64,
223    /// Per-stream row counts assigned by `lean-rs-worker`.
224    pub per_stream_counts: BTreeMap<String, u64>,
225    /// Elapsed time measured in the child for the streaming export.
226    pub elapsed: Duration,
227    /// Optional downstream-defined terminal metadata.
228    pub metadata: Option<Value>,
229}
230
231impl From<StreamSummary> for LeanWorkerStreamSummary {
232    fn from(value: StreamSummary) -> Self {
233        Self {
234            total_rows: value.total_rows,
235            per_stream_counts: value.per_stream_counts,
236            elapsed: Duration::from_micros(value.elapsed_micros),
237            metadata: value.metadata,
238        }
239    }
240}
241
242/// A non-streaming downstream JSON command.
243///
244/// The command names a Lean export with ABI `String -> IO String`. `Req` and
245/// `Resp` are downstream-owned serde types; `lean-rs-worker` owns request
246/// transport, worker lifecycle, timeout, cancellation, and response decoding.
247pub struct LeanWorkerJsonCommand<Req, Resp> {
248    export: String,
249    _types: PhantomData<fn(&Req) -> Resp>,
250}
251
252impl<Req, Resp> LeanWorkerJsonCommand<Req, Resp> {
253    /// Create a typed JSON command for one Lean export.
254    #[must_use]
255    pub fn new(export: impl Into<String>) -> Self {
256        Self {
257            export: export.into(),
258            _types: PhantomData,
259        }
260    }
261
262    /// Return the Lean export name used by this command.
263    #[must_use]
264    pub fn export(&self) -> &str {
265        &self.export
266    }
267}
268
269impl<Req, Resp> Clone for LeanWorkerJsonCommand<Req, Resp> {
270    fn clone(&self) -> Self {
271        Self {
272            export: self.export.clone(),
273            _types: PhantomData,
274        }
275    }
276}
277
278impl<Req, Resp> fmt::Debug for LeanWorkerJsonCommand<Req, Resp> {
279    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280        f.debug_struct("LeanWorkerJsonCommand")
281            .field("export", &self.export)
282            .finish()
283    }
284}
285
286impl<Req, Resp> PartialEq for LeanWorkerJsonCommand<Req, Resp> {
287    fn eq(&self, other: &Self) -> bool {
288        self.export == other.export
289    }
290}
291
292impl<Req, Resp> Eq for LeanWorkerJsonCommand<Req, Resp> {}
293
294/// A streaming downstream JSON command.
295///
296/// The command names a Lean export with ABI
297/// `String -> USize -> USize -> IO UInt8`. `Req`, `Row`, and `Summary` are
298/// downstream-owned serde types. Row and terminal-summary JSON are decoded at
299/// the parent boundary, after `lean-rs-worker` has handled process lifecycle,
300/// framing, diagnostics, timeout, cancellation, and completion.
301pub struct LeanWorkerStreamingCommand<Req, Row, Summary> {
302    export: String,
303    _types: PhantomData<fn(&Req) -> (Row, Summary)>,
304}
305
306impl<Req, Row, Summary> LeanWorkerStreamingCommand<Req, Row, Summary> {
307    /// Create a typed streaming command for one Lean export.
308    #[must_use]
309    pub fn new(export: impl Into<String>) -> Self {
310        Self {
311            export: export.into(),
312            _types: PhantomData,
313        }
314    }
315
316    /// Return the Lean export name used by this command.
317    #[must_use]
318    pub fn export(&self) -> &str {
319        &self.export
320    }
321}
322
323impl<Req, Row, Summary> Clone for LeanWorkerStreamingCommand<Req, Row, Summary> {
324    fn clone(&self) -> Self {
325        Self {
326            export: self.export.clone(),
327            _types: PhantomData,
328        }
329    }
330}
331
332impl<Req, Row, Summary> fmt::Debug for LeanWorkerStreamingCommand<Req, Row, Summary> {
333    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334        f.debug_struct("LeanWorkerStreamingCommand")
335            .field("export", &self.export)
336            .finish()
337    }
338}
339
340impl<Req, Row, Summary> PartialEq for LeanWorkerStreamingCommand<Req, Row, Summary> {
341    fn eq(&self, other: &Self) -> bool {
342        self.export == other.export
343    }
344}
345
346impl<Req, Row, Summary> Eq for LeanWorkerStreamingCommand<Req, Row, Summary> {}
347
348/// One typed downstream row decoded from a worker data row.
349#[derive(Clone, Debug, Eq, PartialEq)]
350pub struct LeanWorkerTypedDataRow<Row> {
351    pub stream: String,
352    pub sequence: u64,
353    pub payload: Row,
354}
355
356/// Parent-side sink for typed downstream data rows produced by one command.
357///
358/// The sink remains request-local. A panic from `report` is contained by the
359/// worker supervisor and returned as `LeanWorkerError::DataSinkPanic`.
360pub trait LeanWorkerTypedDataSink<Row>: Send + Sync {
361    fn report(&self, row: LeanWorkerTypedDataRow<Row>);
362}
363
364/// Typed summary returned after a streaming command reaches terminal success.
365///
366/// Rows delivered to a typed sink remain tentative until this summary is
367/// returned. `metadata` is decoded from the downstream terminal JSON metadata,
368/// when the export provides it.
369#[derive(Clone, Debug, Eq, PartialEq)]
370pub struct LeanWorkerTypedStreamSummary<Summary> {
371    pub total_rows: u64,
372    pub per_stream_counts: BTreeMap<String, u64>,
373    pub elapsed: Duration,
374    pub metadata: Option<Summary>,
375}
376
377/// Narrow host-session adapter over a live `LeanWorker`.
378///
379/// Dropping this value does not stop the worker. If a request is cancelled
380/// while in flight, the supervisor cycles the child process and this session is
381/// invalidated; open a fresh session before issuing more host requests.
382pub struct LeanWorkerSession<'worker> {
383    worker: &'worker mut LeanWorker,
384    open: bool,
385}
386
387impl LeanWorker {
388    /// Open a host session inside the worker child.
389    ///
390    /// # Errors
391    ///
392    /// Returns `LeanWorkerError` if the worker is dead, the child cannot open
393    /// the Lake project/capability/imports, cancellation is already requested,
394    /// or protocol communication fails.
395    pub fn open_session<'worker>(
396        &'worker mut self,
397        config: &LeanWorkerSessionConfig,
398        cancellation: Option<&LeanWorkerCancellationToken>,
399        progress: Option<&dyn LeanWorkerProgressSink>,
400    ) -> Result<LeanWorkerSession<'worker>, LeanWorkerError> {
401        self.open_worker_session(config, cancellation, progress)?;
402        Ok(LeanWorkerSession {
403            worker: self,
404            open: true,
405        })
406    }
407}
408
409impl LeanWorkerSession<'_> {
410    /// Return the timeout used for subsequent requests on this session.
411    #[must_use]
412    pub fn request_timeout(&self) -> Duration {
413        self.worker.request_timeout()
414    }
415
416    /// Change the timeout for subsequent requests on this session.
417    ///
418    /// A timeout is parent-enforced. If it fires, the supervisor kills and
419    /// replaces the child process and invalidates this session.
420    pub fn set_request_timeout(&mut self, timeout: Duration) {
421        self.worker.set_request_timeout(timeout);
422    }
423
424    /// Elaborate one term and return only process-safe success/diagnostic data.
425    ///
426    /// # Errors
427    ///
428    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
429    /// host error, cancellation is observed, a progress sink panics, or protocol
430    /// communication fails.
431    pub fn elaborate(
432        &mut self,
433        source: &str,
434        options: &LeanWorkerElabOptions,
435        cancellation: Option<&LeanWorkerCancellationToken>,
436        progress: Option<&dyn LeanWorkerProgressSink>,
437    ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
438        self.with_session(|worker| worker.worker_elaborate(source, options, cancellation, progress))
439    }
440
441    /// Kernel-check one declaration and return only process-safe status/diagnostics.
442    ///
443    /// # Errors
444    ///
445    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
446    /// host error, cancellation is observed, a progress sink panics, or protocol
447    /// communication fails.
448    pub fn kernel_check(
449        &mut self,
450        source: &str,
451        options: &LeanWorkerElabOptions,
452        cancellation: Option<&LeanWorkerCancellationToken>,
453        progress: Option<&dyn LeanWorkerProgressSink>,
454    ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
455        self.with_session(|worker| worker.worker_kernel_check(source, options, cancellation, progress))
456    }
457
458    /// Query declaration kinds in bulk.
459    ///
460    /// # Errors
461    ///
462    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
463    /// host error, cancellation is observed, a progress sink panics, or protocol
464    /// communication fails.
465    pub fn declaration_kinds(
466        &mut self,
467        names: &[&str],
468        cancellation: Option<&LeanWorkerCancellationToken>,
469        progress: Option<&dyn LeanWorkerProgressSink>,
470    ) -> Result<Vec<String>, LeanWorkerError> {
471        self.with_session(|worker| worker.worker_declaration_kinds(names, cancellation, progress))
472    }
473
474    /// Render declaration names in bulk.
475    ///
476    /// # Errors
477    ///
478    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
479    /// host error, cancellation is observed, a progress sink panics, or protocol
480    /// communication fails.
481    pub fn declaration_names(
482        &mut self,
483        names: &[&str],
484        cancellation: Option<&LeanWorkerCancellationToken>,
485        progress: Option<&dyn LeanWorkerProgressSink>,
486    ) -> Result<Vec<String>, LeanWorkerError> {
487        self.with_session(|worker| worker.worker_declaration_names(names, cancellation, progress))
488    }
489
490    /// Elaborate `source` and infer the resulting expression's type.
491    ///
492    /// The child attempts notation-aware rendering via the optional
493    /// `meta_pp_expr` shim (`Lean.PrettyPrinter.ppExpr`) and falls back to
494    /// `Expr.toString` when the shim is absent or reports `Unsupported`. The
495    /// returned [`LeanWorkerRendered::rendering`] reports which path produced
496    /// the value.
497    ///
498    /// Heartbeat budgeting: each `MetaM` pass (the primary `inferType` call
499    /// and the pretty-printer) runs under the same
500    /// [`LeanWorkerElabOptions::heartbeat_limit`] value, independently
501    /// bounded — the pretty-printer does not consume budget left over from
502    /// the primary call. A `Failed` or `TimeoutOrHeartbeat` reported by the
503    /// pretty-printer surfaces as the *whole* call's failure (matching
504    /// in-process behaviour); there is no path that returns the inferred
505    /// expression alongside a pretty-printer failure. Only `Unsupported`
506    /// from `pp_expr` triggers the raw fallback.
507    ///
508    /// # Errors
509    ///
510    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
511    /// host error, cancellation is observed, a progress sink panics, or
512    /// protocol communication fails. Lean-side failures (type errors,
513    /// heartbeat exhaustion, missing capability) surface inside the returned
514    /// [`LeanWorkerMetaResult`] rather than as `Err`.
515    pub fn infer_type(
516        &mut self,
517        source: &str,
518        options: &LeanWorkerElabOptions,
519        cancellation: Option<&LeanWorkerCancellationToken>,
520        progress: Option<&dyn LeanWorkerProgressSink>,
521    ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
522        self.with_session(|worker| worker.worker_infer_type(source, options, cancellation, progress))
523    }
524
525    /// Elaborate `source` and reduce it to weak head normal form.
526    ///
527    /// Rendering and heartbeat-budgeting semantics match [`Self::infer_type`]:
528    /// the child attempts notation-aware rendering via `meta_pp_expr` and
529    /// falls back to `Expr.toString` when the shim reports `Unsupported`.
530    /// Each `MetaM` pass is independently bounded by `heartbeat_limit`; a
531    /// `Failed` or `TimeoutOrHeartbeat` from the pretty-printer surfaces as
532    /// the whole call's failure.
533    ///
534    /// # Errors
535    ///
536    /// Returns `LeanWorkerError` under the same conditions as
537    /// [`Self::infer_type`]. Lean-side failures surface inside the returned
538    /// [`LeanWorkerMetaResult`].
539    pub fn whnf(
540        &mut self,
541        source: &str,
542        options: &LeanWorkerElabOptions,
543        cancellation: Option<&LeanWorkerCancellationToken>,
544        progress: Option<&dyn LeanWorkerProgressSink>,
545    ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
546        self.with_session(|worker| worker.worker_whnf(source, options, cancellation, progress))
547    }
548
549    /// Elaborate `lhs` and `rhs` and ask Lean whether they are definitionally
550    /// equal at the supplied transparency.
551    ///
552    /// # Errors
553    ///
554    /// Returns `LeanWorkerError` under the same conditions as
555    /// [`Self::infer_type`]. Lean-side failures surface inside the returned
556    /// [`LeanWorkerMetaResult`].
557    pub fn is_def_eq(
558        &mut self,
559        lhs: &str,
560        rhs: &str,
561        transparency: LeanWorkerMetaTransparency,
562        options: &LeanWorkerElabOptions,
563        cancellation: Option<&LeanWorkerCancellationToken>,
564        progress: Option<&dyn LeanWorkerProgressSink>,
565    ) -> Result<LeanWorkerMetaResult<bool>, LeanWorkerError> {
566        self.with_session(|worker| worker.worker_is_def_eq(lhs, rhs, transparency, options, cancellation, progress))
567    }
568
569    /// Describe a declaration: its kind, rendered type, and source range.
570    ///
571    /// Returns `Ok(None)` when the name is not in the session's open
572    /// environment.
573    ///
574    /// # Errors
575    ///
576    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
577    /// host error, cancellation is observed, a progress sink panics, or
578    /// protocol communication fails.
579    pub fn describe(
580        &mut self,
581        name: &str,
582        cancellation: Option<&LeanWorkerCancellationToken>,
583        progress: Option<&dyn LeanWorkerProgressSink>,
584    ) -> Result<Option<LeanWorkerDeclarationRow>, LeanWorkerError> {
585        self.with_session(|worker| worker.worker_describe(name, cancellation, progress))
586    }
587
588    /// Enumerate the session's open environment and return the matching
589    /// declaration names as dotted strings.
590    ///
591    /// The child streams names one per protocol frame so total payload size
592    /// is unbounded; any single Lean name fits well under the per-frame cap.
593    ///
594    /// # Errors
595    ///
596    /// Returns `LeanWorkerError` under the same conditions as
597    /// [`Self::describe`].
598    pub fn list_declarations_strings(
599        &mut self,
600        filter: &LeanWorkerDeclarationFilter,
601        cancellation: Option<&LeanWorkerCancellationToken>,
602        progress: Option<&dyn LeanWorkerProgressSink>,
603    ) -> Result<Vec<String>, LeanWorkerError> {
604        self.with_session(|worker| worker.worker_list_declarations_strings(*filter, cancellation, progress))
605    }
606
607    /// Describe a batch of declarations in one IPC round-trip.
608    ///
609    /// Each input name produces one row in the returned vector, in the same
610    /// order. Absent names keep their slot with `kind == "missing"`,
611    /// `type_signature: None`, and `source: None` so callers can correlate
612    /// rows back to inputs positionally.
613    ///
614    /// # Errors
615    ///
616    /// Returns `LeanWorkerError` under the same conditions as
617    /// [`Self::describe`].
618    pub fn describe_bulk(
619        &mut self,
620        names: &[&str],
621        cancellation: Option<&LeanWorkerCancellationToken>,
622        progress: Option<&dyn LeanWorkerProgressSink>,
623    ) -> Result<Vec<LeanWorkerDeclarationRow>, LeanWorkerError> {
624        self.with_session(|worker| worker.worker_describe_bulk(names, cancellation, progress))
625    }
626
627    /// Parse, elaborate, and project a Lean source string into an
628    /// `Elab.InfoTree` projection. The session's open environment supplies
629    /// the imports; the source must not declare its own header.
630    ///
631    /// # Errors
632    ///
633    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
634    /// host error, cancellation is observed, a progress sink panics, or
635    /// protocol communication fails. Per-command elaboration failures appear
636    /// inside `LeanWorkerProcessedFile::diagnostics`; the
637    /// `LeanWorkerProcessFileOutcome::Unsupported` arm surfaces missing
638    /// capability shims.
639    pub fn process_file(
640        &mut self,
641        source: &str,
642        options: &LeanWorkerElabOptions,
643        cancellation: Option<&LeanWorkerCancellationToken>,
644        progress: Option<&dyn LeanWorkerProgressSink>,
645    ) -> Result<LeanWorkerProcessFileOutcome, LeanWorkerError> {
646        self.with_session(|worker| worker.worker_process_file(source, options, cancellation, progress))
647    }
648
649    /// Parse a Lean module header (`import` declarations and prelude) and
650    /// elaborate the body; project both into an `Elab.InfoTree` projection
651    /// plus the parsed import list.
652    ///
653    /// # Errors
654    ///
655    /// Returns `LeanWorkerError` under the same conditions as
656    /// [`Self::process_file`]. Header-parse failures, missing imports, and
657    /// missing capability shims surface as variants of
658    /// [`LeanWorkerProcessModuleOutcome`].
659    pub fn process_module(
660        &mut self,
661        source: &str,
662        options: &LeanWorkerElabOptions,
663        cancellation: Option<&LeanWorkerCancellationToken>,
664        progress: Option<&dyn LeanWorkerProgressSink>,
665    ) -> Result<LeanWorkerProcessModuleOutcome, LeanWorkerError> {
666        self.with_session(|worker| worker.worker_process_module(source, options, cancellation, progress))
667    }
668
669    /// Run a downstream streaming export and deliver JSON rows to `rows`.
670    ///
671    /// The Lean export must have ABI
672    /// `String -> USize -> USize -> IO UInt8`. The child supplies the
673    /// callback handle and trampoline; the parent only sees validated
674    /// `LeanWorkerDataRow` values.
675    ///
676    /// # Errors
677    ///
678    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
679    /// host or stream error, cancellation is observed, a sink panics, or
680    /// protocol communication fails. In-flight cancellation cycles the child
681    /// and invalidates this session.
682    pub fn run_data_stream(
683        &mut self,
684        export: &str,
685        request: &Value,
686        rows: &dyn LeanWorkerDataSink,
687        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
688        cancellation: Option<&LeanWorkerCancellationToken>,
689        progress: Option<&dyn LeanWorkerProgressSink>,
690    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
691        self.with_session(|worker| {
692            worker.worker_run_data_stream(export, request, rows, diagnostics, cancellation, progress)
693        })
694    }
695
696    fn run_data_stream_raw(
697        &mut self,
698        export: &str,
699        request: &Value,
700        rows: &dyn LeanWorkerRawDataSink,
701        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
702        cancellation: Option<&LeanWorkerCancellationToken>,
703        progress: Option<&dyn LeanWorkerProgressSink>,
704    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
705        self.with_session(|worker| {
706            worker.worker_run_data_stream_raw(export, request, rows, diagnostics, cancellation, progress)
707        })
708    }
709
710    /// Run a typed non-streaming downstream JSON command.
711    ///
712    /// The Lean export must have ABI `String -> IO String`. The request is
713    /// serialized from `Req`; the returned JSON string is decoded into `Resp`.
714    /// Use this for commands that return one terminal JSON value and no rows.
715    ///
716    /// # Errors
717    ///
718    /// Returns `LeanWorkerError` if request encoding fails, the worker is
719    /// dead, the session was invalidated, the export is missing, response
720    /// decoding fails, cancellation or timeout is observed, a progress sink
721    /// panics, or protocol communication fails.
722    pub fn run_json_command<Req, Resp>(
723        &mut self,
724        command: &LeanWorkerJsonCommand<Req, Resp>,
725        request: &Req,
726        cancellation: Option<&LeanWorkerCancellationToken>,
727        progress: Option<&dyn LeanWorkerProgressSink>,
728    ) -> Result<Resp, LeanWorkerError>
729    where
730        Req: Serialize,
731        Resp: DeserializeOwned,
732    {
733        let request_json =
734            serde_json::to_string(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
735                export: command.export().to_owned(),
736                message: err.to_string(),
737            })?;
738        let response_json = self.with_session(|worker| {
739            worker.worker_json_command(command.export(), request_json, cancellation, progress)
740        })?;
741        serde_json::from_str(&response_json).map_err(|err| LeanWorkerError::TypedCommandResponseDecode {
742            export: command.export().to_owned(),
743            message: err.to_string(),
744        })
745    }
746
747    /// Run a typed downstream streaming command.
748    ///
749    /// The Lean export must have ABI
750    /// `String -> USize -> USize -> IO UInt8`. The request is serialized from
751    /// `Req`; each row payload is decoded into `Row`; terminal metadata is
752    /// decoded into `Summary` when present. Raw-row access remains available
753    /// through `run_data_stream`.
754    ///
755    /// # Errors
756    ///
757    /// Returns `LeanWorkerError` if request encoding fails, row or summary
758    /// decoding fails, the worker is dead, the session was invalidated, the
759    /// export fails, cancellation or timeout is observed, a sink panics, or
760    /// protocol communication fails. Row decode errors include the stream and
761    /// sequence that identified the bad payload.
762    pub fn run_streaming_command<Req, Row, Summary>(
763        &mut self,
764        command: &LeanWorkerStreamingCommand<Req, Row, Summary>,
765        request: &Req,
766        rows: &dyn LeanWorkerTypedDataSink<Row>,
767        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
768        cancellation: Option<&LeanWorkerCancellationToken>,
769        progress: Option<&dyn LeanWorkerProgressSink>,
770    ) -> Result<LeanWorkerTypedStreamSummary<Summary>, LeanWorkerError>
771    where
772        Req: Serialize,
773        Row: DeserializeOwned,
774        Summary: DeserializeOwned,
775    {
776        let request_value =
777            serde_json::to_value(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
778                export: command.export().to_owned(),
779                message: err.to_string(),
780            })?;
781        let internal_cancellation = LeanWorkerCancellationToken::new();
782        let cancellation_for_stream = cancellation.unwrap_or(&internal_cancellation);
783        let typed_sink = TypedRawDataSink {
784            export: command.export(),
785            rows,
786            cancellation: cancellation_for_stream,
787            decode_error: std::sync::Mutex::new(None),
788        };
789
790        // `run_data_stream_raw` invalidates the session on Cancelled/Timeout
791        // via `with_session`; we only reshape the result here.
792        match self.run_data_stream_raw(
793            command.export(),
794            &request_value,
795            &typed_sink,
796            diagnostics,
797            Some(cancellation_for_stream),
798            progress,
799        ) {
800            Ok(summary) => {
801                if let Some(err) = typed_sink.take_decode_error() {
802                    return Err(err);
803                }
804                let metadata = summary
805                    .metadata
806                    .map(|metadata| {
807                        serde_json::from_value(metadata).map_err(|err| LeanWorkerError::TypedCommandSummaryDecode {
808                            export: command.export().to_owned(),
809                            message: err.to_string(),
810                        })
811                    })
812                    .transpose()?;
813                Ok(LeanWorkerTypedStreamSummary {
814                    total_rows: summary.total_rows,
815                    per_stream_counts: summary.per_stream_counts,
816                    elapsed: summary.elapsed,
817                    metadata,
818                })
819            }
820            Err(LeanWorkerError::Cancelled { .. }) => {
821                if let Some(err) = typed_sink.take_decode_error() {
822                    Err(err)
823                } else {
824                    Err(LeanWorkerError::Cancelled {
825                        operation: "worker_run_data_stream",
826                    })
827                }
828            }
829            Err(err) => Err(err),
830        }
831    }
832
833    /// Query generic metadata from a downstream capability export.
834    ///
835    /// The Lean export must have ABI `String -> IO String`. The request and
836    /// response strings are JSON, but callers receive a typed metadata
837    /// envelope rather than private protocol frames.
838    ///
839    /// # Errors
840    ///
841    /// Returns `LeanWorkerError` if the worker is dead, the session was
842    /// invalidated, the export is missing, request or response JSON is
843    /// malformed, cancellation or timeout is observed, a progress sink panics,
844    /// or protocol communication fails.
845    pub fn capability_metadata(
846        &mut self,
847        export: &str,
848        request: &Value,
849        cancellation: Option<&LeanWorkerCancellationToken>,
850        progress: Option<&dyn LeanWorkerProgressSink>,
851    ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
852        self.with_session(|worker| worker.worker_capability_metadata(export, request, cancellation, progress))
853    }
854
855    /// Run a generic doctor check from a downstream capability export.
856    ///
857    /// The Lean export must have ABI `String -> IO String`. Doctor diagnostics
858    /// are capability-layer facts; data rows remain reserved for downstream
859    /// streaming payloads.
860    ///
861    /// # Errors
862    ///
863    /// Returns `LeanWorkerError` if the worker is dead, the session was
864    /// invalidated, the export is missing, request or response JSON is
865    /// malformed, cancellation or timeout is observed, a progress sink panics,
866    /// or protocol communication fails.
867    pub fn capability_doctor(
868        &mut self,
869        export: &str,
870        request: &Value,
871        cancellation: Option<&LeanWorkerCancellationToken>,
872        progress: Option<&dyn LeanWorkerProgressSink>,
873    ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
874        self.with_session(|worker| worker.worker_capability_doctor(export, request, cancellation, progress))
875    }
876
877    fn ensure_open(&self) -> Result<(), LeanWorkerError> {
878        if self.open {
879            Ok(())
880        } else {
881            Err(LeanWorkerError::UnsupportedRequest {
882                operation: "worker_session_invalidated",
883            })
884        }
885    }
886
887    /// Run an operation against the underlying worker, applying the session
888    /// invalidation policy uniformly.
889    ///
890    /// Centralizes the rule "Cancelled or Timeout from the worker invalidates
891    /// this session" so every typed method delegates here instead of repeating
892    /// the same `match` discriminator. Adding a new terminal-failure variant
893    /// to the invalidation set is now a one-line edit.
894    fn with_session<T>(
895        &mut self,
896        f: impl FnOnce(&mut LeanWorker) -> Result<T, LeanWorkerError>,
897    ) -> Result<T, LeanWorkerError> {
898        self.ensure_open()?;
899        let result = f(self.worker);
900        if matches!(
901            result,
902            Err(LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })
903        ) {
904            self.open = false;
905        }
906        result
907    }
908}
909
910struct TypedRawDataSink<'a, Row> {
911    export: &'a str,
912    rows: &'a dyn LeanWorkerTypedDataSink<Row>,
913    cancellation: &'a LeanWorkerCancellationToken,
914    decode_error: std::sync::Mutex<Option<LeanWorkerError>>,
915}
916
917impl<Row> TypedRawDataSink<'_, Row> {
918    fn take_decode_error(&self) -> Option<LeanWorkerError> {
919        self.decode_error.lock().ok().and_then(|mut guard| guard.take())
920    }
921}
922
923impl<Row> LeanWorkerRawDataSink for TypedRawDataSink<'_, Row>
924where
925    Row: DeserializeOwned,
926{
927    fn report(&self, row: LeanWorkerRawDataRow) {
928        match serde_json::from_str(row.payload.get()) {
929            Ok(payload) => self.rows.report(LeanWorkerTypedDataRow {
930                stream: row.stream,
931                sequence: row.sequence,
932                payload,
933            }),
934            Err(err) => {
935                if let Ok(mut guard) = self.decode_error.lock() {
936                    *guard = Some(LeanWorkerError::TypedCommandRowDecode {
937                        export: self.export.to_owned(),
938                        stream: row.stream,
939                        sequence: row.sequence,
940                        message: err.to_string(),
941                    });
942                }
943                self.cancellation.cancel();
944            }
945        }
946    }
947}
948
949pub(crate) fn check_cancelled(
950    operation: &'static str,
951    token: Option<&LeanWorkerCancellationToken>,
952) -> Result<(), LeanWorkerError> {
953    if token.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
954        Err(LeanWorkerError::Cancelled { operation })
955    } else {
956        Ok(())
957    }
958}
959
960pub(crate) fn report_parent_progress(
961    sink: Option<&dyn LeanWorkerProgressSink>,
962    event: LeanWorkerProgressEvent,
963) -> Result<(), LeanWorkerError> {
964    let Some(sink) = sink else {
965        return Ok(());
966    };
967    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(event))).map_err(|payload| {
968        let message = if let Some(s) = payload.downcast_ref::<&str>() {
969            (*s).to_owned()
970        } else if let Some(s) = payload.downcast_ref::<String>() {
971            s.clone()
972        } else {
973            "worker progress sink panicked".to_owned()
974        };
975        LeanWorkerError::ProgressPanic { message }
976    })
977}
978
979pub(crate) fn report_parent_data_row(
980    sink: Option<LeanWorkerDataSinkTarget<'_>>,
981    row: DataRow,
982) -> Result<(), LeanWorkerError> {
983    let Some(sink) = sink else {
984        return Err(LeanWorkerError::Protocol {
985            message: "worker sent data row for a request without a row sink".to_owned(),
986        });
987    };
988    match sink {
989        LeanWorkerDataSinkTarget::Value(sink) => {
990            let row = LeanWorkerDataRow::try_from(row)?;
991            report_value_data_row(sink, row)
992        }
993        LeanWorkerDataSinkTarget::Raw(sink) => report_raw_data_row(sink, row.into()),
994    }
995}
996
997fn report_value_data_row(sink: &dyn LeanWorkerDataSink, row: LeanWorkerDataRow) -> Result<(), LeanWorkerError> {
998    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
999        let message = if let Some(s) = payload.downcast_ref::<&str>() {
1000            (*s).to_owned()
1001        } else if let Some(s) = payload.downcast_ref::<String>() {
1002            s.clone()
1003        } else {
1004            "worker data sink panicked".to_owned()
1005        };
1006        LeanWorkerError::DataSinkPanic { message }
1007    })
1008}
1009
1010fn report_raw_data_row(sink: &dyn LeanWorkerRawDataSink, row: LeanWorkerRawDataRow) -> Result<(), LeanWorkerError> {
1011    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
1012        let message = if let Some(s) = payload.downcast_ref::<&str>() {
1013            (*s).to_owned()
1014        } else if let Some(s) = payload.downcast_ref::<String>() {
1015            s.clone()
1016        } else {
1017            "worker data sink panicked".to_owned()
1018        };
1019        LeanWorkerError::DataSinkPanic { message }
1020    })
1021}
1022
1023pub(crate) fn report_parent_diagnostic(
1024    sink: Option<&dyn LeanWorkerDiagnosticSink>,
1025    diagnostic: LeanWorkerDiagnosticEvent,
1026) -> Result<(), LeanWorkerError> {
1027    let Some(sink) = sink else {
1028        return Ok(());
1029    };
1030    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(diagnostic))).map_err(|payload| {
1031        let message = if let Some(s) = payload.downcast_ref::<&str>() {
1032            (*s).to_owned()
1033        } else if let Some(s) = payload.downcast_ref::<String>() {
1034            s.clone()
1035        } else {
1036            "worker diagnostic sink panicked".to_owned()
1037        };
1038        LeanWorkerError::DiagnosticSinkPanic { message }
1039    })
1040}
1041
1042pub(crate) fn elapsed_event(
1043    phase: String,
1044    current: u64,
1045    total: Option<u64>,
1046    started: Instant,
1047) -> LeanWorkerProgressEvent {
1048    LeanWorkerProgressEvent {
1049        phase,
1050        current,
1051        total,
1052        elapsed: started.elapsed(),
1053    }
1054}