Skip to main content

lean_rs_worker/
session.rs

1//! Worker-host adapter over the process-boundary supervisor.
2//!
3//! This module is intentionally narrower than `lean-rs-host::LeanSession`.
4//! It exposes serializable outcomes that make sense across a child process:
5//! declaration text, elaboration diagnostics, and kernel-check status. Runtime
6//! handles such as `LeanExpr` and `LeanEvidence` stay inside the child.
7
8use std::collections::BTreeMap;
9use std::fmt;
10use std::marker::PhantomData;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::time::{Duration, Instant};
15
16use serde::Serialize;
17use serde::de::DeserializeOwned;
18use serde_json::Value;
19use serde_json::value::RawValue;
20
21use crate::protocol::{
22    DataRow, Diagnostic, StreamSummary, WorkerCapabilityFact, WorkerCapabilityMetadata, WorkerCommandMetadata,
23    WorkerDiagnostic, WorkerDoctorDiagnostic, WorkerDoctorReport, WorkerDoctorSeverity, WorkerElabOptions,
24    WorkerElabOutcome, WorkerKernelOutcome, WorkerKernelStatus,
25};
26use crate::supervisor::{LeanWorker, LeanWorkerError};
27
28/// Configuration for opening one host session inside a worker child.
29#[derive(Clone, Debug, Eq, PartialEq)]
30pub struct LeanWorkerSessionConfig {
31    project_root: PathBuf,
32    package: String,
33    lib_name: String,
34    imports: Vec<String>,
35}
36
37impl LeanWorkerSessionConfig {
38    /// Create a session configuration for a Lake capability and import list.
39    pub fn new(
40        project_root: impl Into<PathBuf>,
41        package: impl Into<String>,
42        lib_name: impl Into<String>,
43        imports: impl IntoIterator<Item = impl Into<String>>,
44    ) -> Self {
45        Self {
46            project_root: project_root.into(),
47            package: package.into(),
48            lib_name: lib_name.into(),
49            imports: imports.into_iter().map(Into::into).collect(),
50        }
51    }
52
53    pub(crate) fn project_root_string(&self) -> String {
54        self.project_root.to_string_lossy().into_owned()
55    }
56
57    pub(crate) fn package(&self) -> &str {
58        &self.package
59    }
60
61    pub(crate) fn lib_name(&self) -> &str {
62        &self.lib_name
63    }
64
65    pub(crate) fn imports(&self) -> &[String] {
66        &self.imports
67    }
68}
69
70/// Bounded elaboration options for worker-session requests.
71///
72/// This mirrors the stable knobs from `lean-rs-host::LeanElabOptions` without
73/// exposing the in-child host object itself across the process boundary.
74#[derive(Clone, Debug, Eq, PartialEq)]
75pub struct LeanWorkerElabOptions {
76    namespace_context: String,
77    file_label: String,
78    heartbeat_limit: u64,
79    diagnostic_byte_limit: usize,
80}
81
82impl LeanWorkerElabOptions {
83    /// Create worker elaboration options with `lean-rs-host` defaults.
84    #[must_use]
85    pub fn new() -> Self {
86        Self::default()
87    }
88
89    /// Replace the namespace context.
90    #[must_use]
91    pub fn namespace_context(mut self, namespace: &str) -> Self {
92        namespace.clone_into(&mut self.namespace_context);
93        self
94    }
95
96    /// Replace the diagnostic file label.
97    #[must_use]
98    pub fn file_label(mut self, label: &str) -> Self {
99        label.clone_into(&mut self.file_label);
100        self
101    }
102
103    /// Replace the heartbeat limit. The child applies the host ceiling.
104    #[must_use]
105    pub fn heartbeat_limit(mut self, heartbeats: u64) -> Self {
106        self.heartbeat_limit = heartbeats;
107        self
108    }
109
110    /// Replace the diagnostic byte limit. The child applies the host ceiling.
111    #[must_use]
112    pub fn diagnostic_byte_limit(mut self, bytes: usize) -> Self {
113        self.diagnostic_byte_limit = bytes;
114        self
115    }
116
117    pub(crate) fn wire(&self) -> WorkerElabOptions {
118        WorkerElabOptions {
119            namespace_context: self.namespace_context.clone(),
120            file_label: self.file_label.clone(),
121            heartbeat_limit: self.heartbeat_limit,
122            diagnostic_byte_limit: self.diagnostic_byte_limit,
123        }
124    }
125}
126
127impl Default for LeanWorkerElabOptions {
128    fn default() -> Self {
129        Self {
130            namespace_context: String::new(),
131            file_label: "<elaborate>".to_owned(),
132            heartbeat_limit: lean_rs_host::LEAN_HEARTBEAT_LIMIT_DEFAULT,
133            diagnostic_byte_limit: lean_rs_host::LEAN_DIAGNOSTIC_BYTE_LIMIT_DEFAULT,
134        }
135    }
136}
137
138/// Protocol/runtime facts reported by the worker child during handshake.
139///
140/// These facts describe the `lean-rs-worker` process and framing contract.
141/// They are separate from downstream capability metadata returned by
142/// `LeanWorkerSession::capability_metadata`.
143#[derive(Clone, Debug, Eq, PartialEq)]
144pub struct LeanWorkerRuntimeMetadata {
145    pub worker_version: String,
146    pub protocol_version: u16,
147    pub lean_version: Option<String>,
148}
149
150/// Generic metadata reported by one downstream capability package.
151///
152/// Command names, capability names, versions, and `extra` JSON are downstream
153/// semantics. `lean-rs-worker` transports and validates the envelope; it does
154/// not decide which values affect caches.
155#[derive(Clone, Debug, Eq, PartialEq)]
156pub struct LeanWorkerCapabilityMetadata {
157    pub commands: Vec<LeanWorkerCommandMetadata>,
158    pub capabilities: Vec<LeanWorkerCapabilityFact>,
159    pub lean_version: Option<String>,
160    pub extra: Option<Value>,
161}
162
163impl From<WorkerCapabilityMetadata> for LeanWorkerCapabilityMetadata {
164    fn from(value: WorkerCapabilityMetadata) -> Self {
165        Self {
166            commands: value.commands.into_iter().map(Into::into).collect(),
167            capabilities: value.capabilities.into_iter().map(Into::into).collect(),
168            lean_version: value.lean_version,
169            extra: value.extra,
170        }
171    }
172}
173
174/// One downstream command advertised by capability metadata.
175#[derive(Clone, Debug, Eq, PartialEq)]
176pub struct LeanWorkerCommandMetadata {
177    pub name: String,
178    pub version: String,
179}
180
181impl From<WorkerCommandMetadata> for LeanWorkerCommandMetadata {
182    fn from(value: WorkerCommandMetadata) -> Self {
183        Self {
184            name: value.name,
185            version: value.version,
186        }
187    }
188}
189
190/// One named capability advertised by capability metadata.
191#[derive(Clone, Debug, Eq, PartialEq)]
192pub struct LeanWorkerCapabilityFact {
193    pub name: String,
194    pub version: String,
195}
196
197impl From<WorkerCapabilityFact> for LeanWorkerCapabilityFact {
198    fn from(value: WorkerCapabilityFact) -> Self {
199        Self {
200            name: value.name,
201            version: value.version,
202        }
203    }
204}
205
206/// Severity for a capability doctor diagnostic.
207#[derive(Clone, Copy, Debug, Eq, PartialEq)]
208#[non_exhaustive]
209pub enum LeanWorkerDoctorSeverity {
210    Pass,
211    Warning,
212    Error,
213}
214
215impl From<WorkerDoctorSeverity> for LeanWorkerDoctorSeverity {
216    fn from(value: WorkerDoctorSeverity) -> Self {
217        match value {
218            WorkerDoctorSeverity::Pass => Self::Pass,
219            WorkerDoctorSeverity::Warning => Self::Warning,
220            WorkerDoctorSeverity::Error => Self::Error,
221        }
222    }
223}
224
225/// One structured capability health diagnostic.
226#[derive(Clone, Debug, Eq, PartialEq)]
227pub struct LeanWorkerDoctorDiagnostic {
228    pub severity: LeanWorkerDoctorSeverity,
229    pub code: String,
230    pub message: String,
231    pub details: Option<Value>,
232}
233
234impl From<WorkerDoctorDiagnostic> for LeanWorkerDoctorDiagnostic {
235    fn from(value: WorkerDoctorDiagnostic) -> Self {
236        Self {
237            severity: value.severity.into(),
238            code: value.code,
239            message: value.message,
240            details: value.details,
241        }
242    }
243}
244
245/// Capability health report returned by a downstream doctor export.
246#[derive(Clone, Debug, Eq, PartialEq)]
247pub struct LeanWorkerDoctorReport {
248    pub diagnostics: Vec<LeanWorkerDoctorDiagnostic>,
249    pub metadata: Option<Value>,
250}
251
252impl From<WorkerDoctorReport> for LeanWorkerDoctorReport {
253    fn from(value: WorkerDoctorReport) -> Self {
254        Self {
255            diagnostics: value.diagnostics.into_iter().map(Into::into).collect(),
256            metadata: value.metadata,
257        }
258    }
259}
260
261/// Parent-side cancellation token for worker-session requests.
262///
263/// Cancellation is observed by the supervisor before a request is sent and at
264/// worker progress frames while a request is in flight. In-flight cancellation
265/// cycles the child process; it does not share an in-process
266/// `LeanCancellationToken` with the child.
267#[derive(Clone, Debug, Default)]
268pub struct LeanWorkerCancellationToken {
269    cancelled: Arc<AtomicBool>,
270}
271
272impl LeanWorkerCancellationToken {
273    /// Create a non-cancelled token.
274    #[must_use]
275    pub fn new() -> Self {
276        Self::default()
277    }
278
279    /// Request cancellation.
280    pub fn cancel(&self) {
281        self.cancelled.store(true, Ordering::Release);
282    }
283
284    /// Whether cancellation was requested.
285    #[must_use]
286    pub fn is_cancelled(&self) -> bool {
287        self.cancelled.load(Ordering::Acquire)
288    }
289}
290
291/// One progress event observed by the parent from a worker request.
292#[derive(Clone, Debug, Eq, PartialEq)]
293pub struct LeanWorkerProgressEvent {
294    pub phase: String,
295    pub current: u64,
296    pub total: Option<u64>,
297    pub elapsed: Duration,
298}
299
300/// Parent-side sink for worker progress events.
301pub trait LeanWorkerProgressSink: Send + Sync {
302    fn report(&self, event: LeanWorkerProgressEvent);
303}
304
305/// One downstream-owned JSON row delivered over a worker request.
306///
307/// `stream` is a caller-defined channel name. `sequence` starts at zero per
308/// stream inside one request and is assigned by `lean-rs-worker`. `payload` is
309/// owned JSON; callers may keep it after `LeanWorkerDataSink::report` returns.
310#[derive(Clone, Debug, Eq, PartialEq)]
311pub struct LeanWorkerDataRow {
312    pub stream: String,
313    pub sequence: u64,
314    pub payload: Value,
315}
316
317impl TryFrom<DataRow> for LeanWorkerDataRow {
318    type Error = LeanWorkerError;
319
320    fn try_from(value: DataRow) -> Result<Self, Self::Error> {
321        let payload = serde_json::from_str(value.payload.get()).map_err(|err| LeanWorkerError::Protocol {
322            message: format!("worker data-row payload decode failed: {err}"),
323        })?;
324        Ok(Self {
325            stream: value.stream,
326            sequence: value.sequence,
327            payload,
328        })
329    }
330}
331
332/// Parent-side sink for downstream data rows produced by one worker request.
333///
334/// A sink is borrowed for one request. It receives owned rows and may store
335/// them. If `report` panics, the supervisor catches the panic and returns
336/// `LeanWorkerError::DataSinkPanic`.
337pub trait LeanWorkerDataSink: Send + Sync {
338    fn report(&self, row: LeanWorkerDataRow);
339}
340
341pub(crate) struct LeanWorkerRawDataRow {
342    pub(crate) stream: String,
343    pub(crate) sequence: u64,
344    pub(crate) payload: Box<RawValue>,
345}
346
347impl From<DataRow> for LeanWorkerRawDataRow {
348    fn from(value: DataRow) -> Self {
349        Self {
350            stream: value.stream,
351            sequence: value.sequence,
352            payload: value.payload,
353        }
354    }
355}
356
357pub(crate) trait LeanWorkerRawDataSink: Send + Sync {
358    fn report(&self, row: LeanWorkerRawDataRow);
359}
360
361#[derive(Clone, Copy)]
362pub(crate) enum LeanWorkerDataSinkTarget<'a> {
363    Value(&'a dyn LeanWorkerDataSink),
364    Raw(&'a dyn LeanWorkerRawDataSink),
365}
366
367/// One diagnostic message delivered over a worker request.
368///
369/// Diagnostics are control/observability messages, not data rows. They are
370/// delivered through `LeanWorkerDiagnosticSink` so row payloads remain
371/// downstream-owned data.
372#[derive(Clone, Debug, Eq, PartialEq)]
373pub struct LeanWorkerDiagnosticEvent {
374    pub code: String,
375    pub message: String,
376}
377
378impl From<Diagnostic> for LeanWorkerDiagnosticEvent {
379    fn from(value: Diagnostic) -> Self {
380        Self {
381            code: value.code,
382            message: value.message,
383        }
384    }
385}
386
387/// Parent-side sink for diagnostics produced by one worker request.
388pub trait LeanWorkerDiagnosticSink: Send + Sync {
389    fn report(&self, diagnostic: LeanWorkerDiagnosticEvent);
390}
391
392/// Summary returned after a worker data-stream export completes.
393///
394/// Rows delivered to `LeanWorkerDataSink` are tentative until this summary is
395/// returned successfully. Downstream callers that need atomic commit should
396/// buffer rows in their sink and commit only after terminal success.
397#[derive(Clone, Debug, Eq, PartialEq)]
398pub struct LeanWorkerStreamSummary {
399    /// Total number of rows delivered to the parent before terminal success.
400    pub total_rows: u64,
401    /// Per-stream row counts assigned by `lean-rs-worker`.
402    pub per_stream_counts: BTreeMap<String, u64>,
403    /// Elapsed time measured in the child for the streaming export.
404    pub elapsed: Duration,
405    /// Optional downstream-defined terminal metadata.
406    pub metadata: Option<Value>,
407}
408
409impl From<StreamSummary> for LeanWorkerStreamSummary {
410    fn from(value: StreamSummary) -> Self {
411        Self {
412            total_rows: value.total_rows,
413            per_stream_counts: value.per_stream_counts,
414            elapsed: Duration::from_micros(value.elapsed_micros),
415            metadata: value.metadata,
416        }
417    }
418}
419
420/// A non-streaming downstream JSON command.
421///
422/// The command names a Lean export with ABI `String -> IO String`. `Req` and
423/// `Resp` are downstream-owned serde types; `lean-rs-worker` owns request
424/// transport, worker lifecycle, timeout, cancellation, and response decoding.
425pub struct LeanWorkerJsonCommand<Req, Resp> {
426    export: String,
427    _types: PhantomData<fn(&Req) -> Resp>,
428}
429
430impl<Req, Resp> LeanWorkerJsonCommand<Req, Resp> {
431    /// Create a typed JSON command for one Lean export.
432    #[must_use]
433    pub fn new(export: impl Into<String>) -> Self {
434        Self {
435            export: export.into(),
436            _types: PhantomData,
437        }
438    }
439
440    /// Return the Lean export name used by this command.
441    #[must_use]
442    pub fn export(&self) -> &str {
443        &self.export
444    }
445}
446
447impl<Req, Resp> Clone for LeanWorkerJsonCommand<Req, Resp> {
448    fn clone(&self) -> Self {
449        Self {
450            export: self.export.clone(),
451            _types: PhantomData,
452        }
453    }
454}
455
456impl<Req, Resp> fmt::Debug for LeanWorkerJsonCommand<Req, Resp> {
457    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
458        f.debug_struct("LeanWorkerJsonCommand")
459            .field("export", &self.export)
460            .finish()
461    }
462}
463
464impl<Req, Resp> PartialEq for LeanWorkerJsonCommand<Req, Resp> {
465    fn eq(&self, other: &Self) -> bool {
466        self.export == other.export
467    }
468}
469
470impl<Req, Resp> Eq for LeanWorkerJsonCommand<Req, Resp> {}
471
472/// A streaming downstream JSON command.
473///
474/// The command names a Lean export with ABI
475/// `String -> USize -> USize -> IO UInt8`. `Req`, `Row`, and `Summary` are
476/// downstream-owned serde types. Row and terminal-summary JSON are decoded at
477/// the parent boundary, after `lean-rs-worker` has handled process lifecycle,
478/// framing, diagnostics, timeout, cancellation, and completion.
479pub struct LeanWorkerStreamingCommand<Req, Row, Summary> {
480    export: String,
481    _types: PhantomData<fn(&Req) -> (Row, Summary)>,
482}
483
484impl<Req, Row, Summary> LeanWorkerStreamingCommand<Req, Row, Summary> {
485    /// Create a typed streaming command for one Lean export.
486    #[must_use]
487    pub fn new(export: impl Into<String>) -> Self {
488        Self {
489            export: export.into(),
490            _types: PhantomData,
491        }
492    }
493
494    /// Return the Lean export name used by this command.
495    #[must_use]
496    pub fn export(&self) -> &str {
497        &self.export
498    }
499}
500
501impl<Req, Row, Summary> Clone for LeanWorkerStreamingCommand<Req, Row, Summary> {
502    fn clone(&self) -> Self {
503        Self {
504            export: self.export.clone(),
505            _types: PhantomData,
506        }
507    }
508}
509
510impl<Req, Row, Summary> fmt::Debug for LeanWorkerStreamingCommand<Req, Row, Summary> {
511    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512        f.debug_struct("LeanWorkerStreamingCommand")
513            .field("export", &self.export)
514            .finish()
515    }
516}
517
518impl<Req, Row, Summary> PartialEq for LeanWorkerStreamingCommand<Req, Row, Summary> {
519    fn eq(&self, other: &Self) -> bool {
520        self.export == other.export
521    }
522}
523
524impl<Req, Row, Summary> Eq for LeanWorkerStreamingCommand<Req, Row, Summary> {}
525
526/// One typed downstream row decoded from a worker data row.
527#[derive(Clone, Debug, Eq, PartialEq)]
528pub struct LeanWorkerTypedDataRow<Row> {
529    pub stream: String,
530    pub sequence: u64,
531    pub payload: Row,
532}
533
534/// Parent-side sink for typed downstream data rows produced by one command.
535///
536/// The sink remains request-local. A panic from `report` is contained by the
537/// worker supervisor and returned as `LeanWorkerError::DataSinkPanic`.
538pub trait LeanWorkerTypedDataSink<Row>: Send + Sync {
539    fn report(&self, row: LeanWorkerTypedDataRow<Row>);
540}
541
542/// Typed summary returned after a streaming command reaches terminal success.
543///
544/// Rows delivered to a typed sink remain tentative until this summary is
545/// returned. `metadata` is decoded from the downstream terminal JSON metadata,
546/// when the export provides it.
547#[derive(Clone, Debug, Eq, PartialEq)]
548pub struct LeanWorkerTypedStreamSummary<Summary> {
549    pub total_rows: u64,
550    pub per_stream_counts: BTreeMap<String, u64>,
551    pub elapsed: Duration,
552    pub metadata: Option<Summary>,
553}
554
555/// Serializable elaboration result returned over the worker boundary.
556#[derive(Clone, Debug, Eq, PartialEq)]
557pub struct LeanWorkerElabResult {
558    pub success: bool,
559    pub diagnostics: Vec<LeanWorkerDiagnostic>,
560    pub truncated: bool,
561}
562
563impl From<WorkerElabOutcome> for LeanWorkerElabResult {
564    fn from(value: WorkerElabOutcome) -> Self {
565        Self {
566            success: value.success,
567            diagnostics: value.diagnostics.into_iter().map(Into::into).collect(),
568            truncated: value.truncated,
569        }
570    }
571}
572
573/// Kernel-check status returned over the worker boundary.
574#[derive(Clone, Copy, Debug, Eq, PartialEq)]
575#[non_exhaustive]
576pub enum LeanWorkerKernelStatus {
577    Checked,
578    Rejected,
579    Unavailable,
580    Unsupported,
581}
582
583/// Serializable kernel-check result returned over the worker boundary.
584#[derive(Clone, Debug, Eq, PartialEq)]
585pub struct LeanWorkerKernelResult {
586    pub status: LeanWorkerKernelStatus,
587    pub diagnostics: Vec<LeanWorkerDiagnostic>,
588    pub truncated: bool,
589}
590
591impl From<WorkerKernelOutcome> for LeanWorkerKernelResult {
592    fn from(value: WorkerKernelOutcome) -> Self {
593        Self {
594            status: match value.status {
595                WorkerKernelStatus::Checked => LeanWorkerKernelStatus::Checked,
596                WorkerKernelStatus::Rejected => LeanWorkerKernelStatus::Rejected,
597                WorkerKernelStatus::Unavailable => LeanWorkerKernelStatus::Unavailable,
598                WorkerKernelStatus::Unsupported => LeanWorkerKernelStatus::Unsupported,
599            },
600            diagnostics: value.diagnostics.into_iter().map(Into::into).collect(),
601            truncated: value.truncated,
602        }
603    }
604}
605
606/// Serializable diagnostic returned by worker elaboration and kernel checks.
607#[derive(Clone, Debug, Eq, PartialEq)]
608pub struct LeanWorkerDiagnostic {
609    pub severity: String,
610    pub message: String,
611    pub file_label: String,
612    pub line: Option<u32>,
613    pub column: Option<u32>,
614    pub end_line: Option<u32>,
615    pub end_column: Option<u32>,
616}
617
618impl From<WorkerDiagnostic> for LeanWorkerDiagnostic {
619    fn from(value: WorkerDiagnostic) -> Self {
620        Self {
621            severity: value.severity,
622            message: value.message,
623            file_label: value.file_label,
624            line: value.line,
625            column: value.column,
626            end_line: value.end_line,
627            end_column: value.end_column,
628        }
629    }
630}
631
632/// Narrow host-session adapter over a live `LeanWorker`.
633///
634/// Dropping this value does not stop the worker. If a request is cancelled
635/// while in flight, the supervisor cycles the child process and this session is
636/// invalidated; open a fresh session before issuing more host requests.
637pub struct LeanWorkerSession<'worker> {
638    worker: &'worker mut LeanWorker,
639    open: bool,
640}
641
642impl LeanWorker {
643    /// Open a host session inside the worker child.
644    ///
645    /// # Errors
646    ///
647    /// Returns `LeanWorkerError` if the worker is dead, the child cannot open
648    /// the Lake project/capability/imports, cancellation is already requested,
649    /// or protocol communication fails.
650    pub fn open_session<'worker>(
651        &'worker mut self,
652        config: &LeanWorkerSessionConfig,
653        cancellation: Option<&LeanWorkerCancellationToken>,
654        progress: Option<&dyn LeanWorkerProgressSink>,
655    ) -> Result<LeanWorkerSession<'worker>, LeanWorkerError> {
656        self.open_worker_session(config, cancellation, progress)?;
657        Ok(LeanWorkerSession {
658            worker: self,
659            open: true,
660        })
661    }
662}
663
664impl LeanWorkerSession<'_> {
665    /// Return the timeout used for subsequent requests on this session.
666    #[must_use]
667    pub fn request_timeout(&self) -> Duration {
668        self.worker.request_timeout()
669    }
670
671    /// Change the timeout for subsequent requests on this session.
672    ///
673    /// A timeout is parent-enforced. If it fires, the supervisor kills and
674    /// replaces the child process and invalidates this session.
675    pub fn set_request_timeout(&mut self, timeout: Duration) {
676        self.worker.set_request_timeout(timeout);
677    }
678
679    /// Elaborate one term and return only process-safe success/diagnostic data.
680    ///
681    /// # Errors
682    ///
683    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
684    /// host error, cancellation is observed, a progress sink panics, or protocol
685    /// communication fails.
686    pub fn elaborate(
687        &mut self,
688        source: &str,
689        options: &LeanWorkerElabOptions,
690        cancellation: Option<&LeanWorkerCancellationToken>,
691        progress: Option<&dyn LeanWorkerProgressSink>,
692    ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
693        self.ensure_open()?;
694        match self.worker.worker_elaborate(source, options, cancellation, progress) {
695            Ok(value) => Ok(value),
696            Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
697                self.open = false;
698                Err(err)
699            }
700            Err(err) => Err(err),
701        }
702    }
703
704    /// Kernel-check one declaration and return only process-safe status/diagnostics.
705    ///
706    /// # Errors
707    ///
708    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
709    /// host error, cancellation is observed, a progress sink panics, or protocol
710    /// communication fails.
711    pub fn kernel_check(
712        &mut self,
713        source: &str,
714        options: &LeanWorkerElabOptions,
715        cancellation: Option<&LeanWorkerCancellationToken>,
716        progress: Option<&dyn LeanWorkerProgressSink>,
717    ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
718        self.ensure_open()?;
719        match self.worker.worker_kernel_check(source, options, cancellation, progress) {
720            Ok(value) => Ok(value),
721            Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
722                self.open = false;
723                Err(err)
724            }
725            Err(err) => Err(err),
726        }
727    }
728
729    /// Query declaration kinds in bulk.
730    ///
731    /// # Errors
732    ///
733    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
734    /// host error, cancellation is observed, a progress sink panics, or protocol
735    /// communication fails.
736    pub fn declaration_kinds(
737        &mut self,
738        names: &[&str],
739        cancellation: Option<&LeanWorkerCancellationToken>,
740        progress: Option<&dyn LeanWorkerProgressSink>,
741    ) -> Result<Vec<String>, LeanWorkerError> {
742        self.ensure_open()?;
743        match self.worker.worker_declaration_kinds(names, cancellation, progress) {
744            Ok(value) => Ok(value),
745            Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
746                self.open = false;
747                Err(err)
748            }
749            Err(err) => Err(err),
750        }
751    }
752
753    /// Render declaration names in bulk.
754    ///
755    /// # Errors
756    ///
757    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
758    /// host error, cancellation is observed, a progress sink panics, or protocol
759    /// communication fails.
760    pub fn declaration_names(
761        &mut self,
762        names: &[&str],
763        cancellation: Option<&LeanWorkerCancellationToken>,
764        progress: Option<&dyn LeanWorkerProgressSink>,
765    ) -> Result<Vec<String>, LeanWorkerError> {
766        self.ensure_open()?;
767        match self.worker.worker_declaration_names(names, cancellation, progress) {
768            Ok(value) => Ok(value),
769            Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
770                self.open = false;
771                Err(err)
772            }
773            Err(err) => Err(err),
774        }
775    }
776
777    /// Run a downstream streaming export and deliver JSON rows to `rows`.
778    ///
779    /// The Lean export must have ABI
780    /// `String -> USize -> USize -> IO UInt8`. The child supplies the
781    /// callback handle and trampoline; the parent only sees validated
782    /// `LeanWorkerDataRow` values.
783    ///
784    /// # Errors
785    ///
786    /// Returns `LeanWorkerError` if the worker is dead, the child reports a
787    /// host or stream error, cancellation is observed, a sink panics, or
788    /// protocol communication fails. In-flight cancellation cycles the child
789    /// and invalidates this session.
790    pub fn run_data_stream(
791        &mut self,
792        export: &str,
793        request: &Value,
794        rows: &dyn LeanWorkerDataSink,
795        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
796        cancellation: Option<&LeanWorkerCancellationToken>,
797        progress: Option<&dyn LeanWorkerProgressSink>,
798    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
799        self.ensure_open()?;
800        match self
801            .worker
802            .worker_run_data_stream(export, request, rows, diagnostics, cancellation, progress)
803        {
804            Ok(value) => Ok(value),
805            Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
806                self.open = false;
807                Err(err)
808            }
809            Err(err) => Err(err),
810        }
811    }
812
813    fn run_data_stream_raw(
814        &mut self,
815        export: &str,
816        request: &Value,
817        rows: &dyn LeanWorkerRawDataSink,
818        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
819        cancellation: Option<&LeanWorkerCancellationToken>,
820        progress: Option<&dyn LeanWorkerProgressSink>,
821    ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
822        self.ensure_open()?;
823        match self
824            .worker
825            .worker_run_data_stream_raw(export, request, rows, diagnostics, cancellation, progress)
826        {
827            Ok(value) => Ok(value),
828            Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
829                self.open = false;
830                Err(err)
831            }
832            Err(err) => Err(err),
833        }
834    }
835
836    /// Run a typed non-streaming downstream JSON command.
837    ///
838    /// The Lean export must have ABI `String -> IO String`. The request is
839    /// serialized from `Req`; the returned JSON string is decoded into `Resp`.
840    /// Use this for commands that return one terminal JSON value and no rows.
841    ///
842    /// # Errors
843    ///
844    /// Returns `LeanWorkerError` if request encoding fails, the worker is
845    /// dead, the session was invalidated, the export is missing, response
846    /// decoding fails, cancellation or timeout is observed, a progress sink
847    /// panics, or protocol communication fails.
848    pub fn run_json_command<Req, Resp>(
849        &mut self,
850        command: &LeanWorkerJsonCommand<Req, Resp>,
851        request: &Req,
852        cancellation: Option<&LeanWorkerCancellationToken>,
853        progress: Option<&dyn LeanWorkerProgressSink>,
854    ) -> Result<Resp, LeanWorkerError>
855    where
856        Req: Serialize,
857        Resp: DeserializeOwned,
858    {
859        self.ensure_open()?;
860        let request_json =
861            serde_json::to_string(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
862                export: command.export().to_owned(),
863                message: err.to_string(),
864            })?;
865        match self
866            .worker
867            .worker_json_command(command.export(), request_json, cancellation, progress)
868        {
869            Ok(response_json) => {
870                serde_json::from_str(&response_json).map_err(|err| LeanWorkerError::TypedCommandResponseDecode {
871                    export: command.export().to_owned(),
872                    message: err.to_string(),
873                })
874            }
875            Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
876                self.open = false;
877                Err(err)
878            }
879            Err(err) => Err(err),
880        }
881    }
882
883    /// Run a typed downstream streaming command.
884    ///
885    /// The Lean export must have ABI
886    /// `String -> USize -> USize -> IO UInt8`. The request is serialized from
887    /// `Req`; each row payload is decoded into `Row`; terminal metadata is
888    /// decoded into `Summary` when present. Raw-row access remains available
889    /// through `run_data_stream`.
890    ///
891    /// # Errors
892    ///
893    /// Returns `LeanWorkerError` if request encoding fails, row or summary
894    /// decoding fails, the worker is dead, the session was invalidated, the
895    /// export fails, cancellation or timeout is observed, a sink panics, or
896    /// protocol communication fails. Row decode errors include the stream and
897    /// sequence that identified the bad payload.
898    pub fn run_streaming_command<Req, Row, Summary>(
899        &mut self,
900        command: &LeanWorkerStreamingCommand<Req, Row, Summary>,
901        request: &Req,
902        rows: &dyn LeanWorkerTypedDataSink<Row>,
903        diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
904        cancellation: Option<&LeanWorkerCancellationToken>,
905        progress: Option<&dyn LeanWorkerProgressSink>,
906    ) -> Result<LeanWorkerTypedStreamSummary<Summary>, LeanWorkerError>
907    where
908        Req: Serialize,
909        Row: DeserializeOwned,
910        Summary: DeserializeOwned,
911    {
912        self.ensure_open()?;
913        let request_value =
914            serde_json::to_value(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
915                export: command.export().to_owned(),
916                message: err.to_string(),
917            })?;
918        let internal_cancellation = LeanWorkerCancellationToken::new();
919        let cancellation_for_stream = cancellation.unwrap_or(&internal_cancellation);
920        let typed_sink = TypedRawDataSink {
921            export: command.export(),
922            rows,
923            cancellation: cancellation_for_stream,
924            decode_error: std::sync::Mutex::new(None),
925        };
926
927        match self.run_data_stream_raw(
928            command.export(),
929            &request_value,
930            &typed_sink,
931            diagnostics,
932            Some(cancellation_for_stream),
933            progress,
934        ) {
935            Ok(summary) => {
936                if let Some(err) = typed_sink.take_decode_error() {
937                    return Err(err);
938                }
939                let metadata = summary
940                    .metadata
941                    .map(|metadata| {
942                        serde_json::from_value(metadata).map_err(|err| LeanWorkerError::TypedCommandSummaryDecode {
943                            export: command.export().to_owned(),
944                            message: err.to_string(),
945                        })
946                    })
947                    .transpose()?;
948                Ok(LeanWorkerTypedStreamSummary {
949                    total_rows: summary.total_rows,
950                    per_stream_counts: summary.per_stream_counts,
951                    elapsed: summary.elapsed,
952                    metadata,
953                })
954            }
955            Err(LeanWorkerError::Cancelled { .. }) => {
956                if let Some(err) = typed_sink.take_decode_error() {
957                    Err(err)
958                } else {
959                    self.open = false;
960                    Err(LeanWorkerError::Cancelled {
961                        operation: "worker_run_data_stream",
962                    })
963                }
964            }
965            Err(err) => Err(err),
966        }
967    }
968
969    /// Query generic metadata from a downstream capability export.
970    ///
971    /// The Lean export must have ABI `String -> IO String`. The request and
972    /// response strings are JSON, but callers receive a typed metadata
973    /// envelope rather than private protocol frames.
974    ///
975    /// # Errors
976    ///
977    /// Returns `LeanWorkerError` if the worker is dead, the session was
978    /// invalidated, the export is missing, request or response JSON is
979    /// malformed, cancellation or timeout is observed, a progress sink panics,
980    /// or protocol communication fails.
981    pub fn capability_metadata(
982        &mut self,
983        export: &str,
984        request: &Value,
985        cancellation: Option<&LeanWorkerCancellationToken>,
986        progress: Option<&dyn LeanWorkerProgressSink>,
987    ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
988        self.ensure_open()?;
989        match self
990            .worker
991            .worker_capability_metadata(export, request, cancellation, progress)
992        {
993            Ok(value) => Ok(value),
994            Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
995                self.open = false;
996                Err(err)
997            }
998            Err(err) => Err(err),
999        }
1000    }
1001
1002    /// Run a generic doctor check from a downstream capability export.
1003    ///
1004    /// The Lean export must have ABI `String -> IO String`. Doctor diagnostics
1005    /// are capability-layer facts; data rows remain reserved for downstream
1006    /// streaming payloads.
1007    ///
1008    /// # Errors
1009    ///
1010    /// Returns `LeanWorkerError` if the worker is dead, the session was
1011    /// invalidated, the export is missing, request or response JSON is
1012    /// malformed, cancellation or timeout is observed, a progress sink panics,
1013    /// or protocol communication fails.
1014    pub fn capability_doctor(
1015        &mut self,
1016        export: &str,
1017        request: &Value,
1018        cancellation: Option<&LeanWorkerCancellationToken>,
1019        progress: Option<&dyn LeanWorkerProgressSink>,
1020    ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
1021        self.ensure_open()?;
1022        match self
1023            .worker
1024            .worker_capability_doctor(export, request, cancellation, progress)
1025        {
1026            Ok(value) => Ok(value),
1027            Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
1028                self.open = false;
1029                Err(err)
1030            }
1031            Err(err) => Err(err),
1032        }
1033    }
1034
1035    fn ensure_open(&self) -> Result<(), LeanWorkerError> {
1036        if self.open {
1037            Ok(())
1038        } else {
1039            Err(LeanWorkerError::UnsupportedRequest {
1040                operation: "worker_session_invalidated",
1041            })
1042        }
1043    }
1044}
1045
1046struct TypedRawDataSink<'a, Row> {
1047    export: &'a str,
1048    rows: &'a dyn LeanWorkerTypedDataSink<Row>,
1049    cancellation: &'a LeanWorkerCancellationToken,
1050    decode_error: std::sync::Mutex<Option<LeanWorkerError>>,
1051}
1052
1053impl<Row> TypedRawDataSink<'_, Row> {
1054    fn take_decode_error(&self) -> Option<LeanWorkerError> {
1055        self.decode_error.lock().ok().and_then(|mut guard| guard.take())
1056    }
1057}
1058
1059impl<Row> LeanWorkerRawDataSink for TypedRawDataSink<'_, Row>
1060where
1061    Row: DeserializeOwned,
1062{
1063    fn report(&self, row: LeanWorkerRawDataRow) {
1064        match serde_json::from_str(row.payload.get()) {
1065            Ok(payload) => self.rows.report(LeanWorkerTypedDataRow {
1066                stream: row.stream,
1067                sequence: row.sequence,
1068                payload,
1069            }),
1070            Err(err) => {
1071                if let Ok(mut guard) = self.decode_error.lock() {
1072                    *guard = Some(LeanWorkerError::TypedCommandRowDecode {
1073                        export: self.export.to_owned(),
1074                        stream: row.stream,
1075                        sequence: row.sequence,
1076                        message: err.to_string(),
1077                    });
1078                }
1079                self.cancellation.cancel();
1080            }
1081        }
1082    }
1083}
1084
1085pub(crate) fn check_cancelled(
1086    operation: &'static str,
1087    token: Option<&LeanWorkerCancellationToken>,
1088) -> Result<(), LeanWorkerError> {
1089    if token.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1090        Err(LeanWorkerError::Cancelled { operation })
1091    } else {
1092        Ok(())
1093    }
1094}
1095
1096pub(crate) fn report_parent_progress(
1097    sink: Option<&dyn LeanWorkerProgressSink>,
1098    event: LeanWorkerProgressEvent,
1099) -> Result<(), LeanWorkerError> {
1100    let Some(sink) = sink else {
1101        return Ok(());
1102    };
1103    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(event))).map_err(|payload| {
1104        let message = if let Some(s) = payload.downcast_ref::<&str>() {
1105            (*s).to_owned()
1106        } else if let Some(s) = payload.downcast_ref::<String>() {
1107            s.clone()
1108        } else {
1109            "worker progress sink panicked".to_owned()
1110        };
1111        LeanWorkerError::ProgressPanic { message }
1112    })
1113}
1114
1115pub(crate) fn report_parent_data_row(
1116    sink: Option<LeanWorkerDataSinkTarget<'_>>,
1117    row: DataRow,
1118) -> Result<(), LeanWorkerError> {
1119    let Some(sink) = sink else {
1120        return Err(LeanWorkerError::Protocol {
1121            message: "worker sent data row for a request without a row sink".to_owned(),
1122        });
1123    };
1124    match sink {
1125        LeanWorkerDataSinkTarget::Value(sink) => {
1126            let row = LeanWorkerDataRow::try_from(row)?;
1127            report_value_data_row(sink, row)
1128        }
1129        LeanWorkerDataSinkTarget::Raw(sink) => report_raw_data_row(sink, row.into()),
1130    }
1131}
1132
1133fn report_value_data_row(sink: &dyn LeanWorkerDataSink, row: LeanWorkerDataRow) -> Result<(), LeanWorkerError> {
1134    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
1135        let message = if let Some(s) = payload.downcast_ref::<&str>() {
1136            (*s).to_owned()
1137        } else if let Some(s) = payload.downcast_ref::<String>() {
1138            s.clone()
1139        } else {
1140            "worker data sink panicked".to_owned()
1141        };
1142        LeanWorkerError::DataSinkPanic { message }
1143    })
1144}
1145
1146fn report_raw_data_row(sink: &dyn LeanWorkerRawDataSink, row: LeanWorkerRawDataRow) -> Result<(), LeanWorkerError> {
1147    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
1148        let message = if let Some(s) = payload.downcast_ref::<&str>() {
1149            (*s).to_owned()
1150        } else if let Some(s) = payload.downcast_ref::<String>() {
1151            s.clone()
1152        } else {
1153            "worker data sink panicked".to_owned()
1154        };
1155        LeanWorkerError::DataSinkPanic { message }
1156    })
1157}
1158
1159pub(crate) fn report_parent_diagnostic(
1160    sink: Option<&dyn LeanWorkerDiagnosticSink>,
1161    diagnostic: LeanWorkerDiagnosticEvent,
1162) -> Result<(), LeanWorkerError> {
1163    let Some(sink) = sink else {
1164        return Ok(());
1165    };
1166    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(diagnostic))).map_err(|payload| {
1167        let message = if let Some(s) = payload.downcast_ref::<&str>() {
1168            (*s).to_owned()
1169        } else if let Some(s) = payload.downcast_ref::<String>() {
1170            s.clone()
1171        } else {
1172            "worker diagnostic sink panicked".to_owned()
1173        };
1174        LeanWorkerError::DiagnosticSinkPanic { message }
1175    })
1176}
1177
1178pub(crate) fn elapsed_event(
1179    phase: String,
1180    current: u64,
1181    total: Option<u64>,
1182    started: Instant,
1183) -> LeanWorkerProgressEvent {
1184    LeanWorkerProgressEvent {
1185        phase,
1186        current,
1187        total,
1188        elapsed: started.elapsed(),
1189    }
1190}