Skip to main content

lean_rs_worker_protocol/
protocol.rs

1//! Length-delimited frame codec and message payload types for the
2//! parent ↔ child worker process boundary.
3//!
4//! ## Additive evolution
5//!
6//! Every public enum here is `#[non_exhaustive]` so the wire format can gain
7//! a new request, response, or message kind without forcing a semver-major
8//! bump on consumers. Most structs are also `#[non_exhaustive]` and expose
9//! `pub fn new(...)` constructors so the shapes can grow fields without
10//! breaking external builders. The exception is [`DataRow`], which is built
11//! so frequently with struct-literal syntax (tests, harnesses, fakes) that
12//! the ergonomic cost of `#[non_exhaustive]` outweighs the additive-evolution
13//! benefit; the wire schema for a data row is also already fixed by the
14//! stream contract.
15
16use std::collections::BTreeMap;
17use std::fmt;
18use std::io::{self, Read, Write};
19use std::time::Duration;
20
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use serde_json::value::RawValue;
24
25use crate::types::{
26    LeanWorkerCapabilityMetadata, LeanWorkerDeclarationFilter, LeanWorkerDeclarationInspectionRequest,
27    LeanWorkerDeclarationInspectionResult, LeanWorkerDeclarationRow, LeanWorkerDeclarationSearch,
28    LeanWorkerDeclarationSearchResult, LeanWorkerDeclarationType, LeanWorkerDeclarationVerificationRequest,
29    LeanWorkerDeclarationVerificationResult, LeanWorkerDoctorReport, LeanWorkerElabOptions, LeanWorkerElabResult,
30    LeanWorkerKernelResult, LeanWorkerMetaResult, LeanWorkerMetaTransparency, LeanWorkerModuleQuery,
31    LeanWorkerModuleQueryBatchOutcome, LeanWorkerModuleQueryOutcome, LeanWorkerModuleQuerySelector,
32    LeanWorkerOutputBudgets, LeanWorkerProofAttemptRequest, LeanWorkerProofAttemptResult, LeanWorkerRendered,
33};
34
35/// Wire protocol version negotiated between parent and child during the
36/// handshake frame. Bump only on a breaking wire change.
37pub const PROTOCOL_VERSION: u16 = 7;
38
39/// Default per-frame size limit applied by the parent when no explicit cap is
40/// configured on the capability builder.
41///
42/// The cap is a parent-side policy decision negotiated to the child at
43/// handshake time via [`Message::ConfigureFrameLimit`]. Both [`write_frame`]
44/// and [`read_frame`] reject frames whose serialised JSON payload exceeds the
45/// cap passed in, so a runaway producer cannot make the peer allocate without
46/// bound. The cap is per-connection—set once at handshake, applied to every
47/// subsequent frame in both directions.
48pub const MAX_FRAME_BYTES: u32 = 1024 * 1024;
49
50/// Floor on the configurable frame cap. Trivial requests and the handshake
51/// itself must fit inside this; callers cannot configure smaller.
52pub const MIN_FRAME_BYTES: u32 = 64 * 1024;
53
54/// Ceiling on the configurable frame cap. Prevents callers from defeating the
55/// memory-safety policy by passing an absurdly large value.
56pub const MAX_FRAME_BYTES_HARD_CAP: u32 = 256 * 1024 * 1024;
57
58/// Versioned envelope around a single protocol [`Message`].
59#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
60#[non_exhaustive]
61pub struct Frame {
62    /// Protocol version the sender used. Receivers reject mismatches.
63    pub version: u16,
64    /// Inner message payload.
65    pub message: Message,
66}
67
68impl Frame {
69    /// Wrap `message` in a frame tagged with the current [`PROTOCOL_VERSION`].
70    #[must_use]
71    pub fn new(message: Message) -> Self {
72        Self {
73            version: PROTOCOL_VERSION,
74            message,
75        }
76    }
77}
78
79/// One protocol message exchanged over the worker boundary.
80#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
81#[serde(tag = "type", content = "body", rename_all = "snake_case")]
82#[non_exhaustive]
83pub enum Message {
84    /// Sent by the child immediately after spawn to advertise its version and
85    /// supported protocol revision.
86    Handshake {
87        /// `lean-rs-worker-child` package version.
88        worker_version: String,
89        /// Protocol version the child speaks. Parent rejects mismatches.
90        protocol_version: u16,
91    },
92    /// Sent by the parent immediately after the handshake frame to negotiate
93    /// the per-frame size cap for the remainder of this connection.
94    ///
95    /// The parent owns the memory-safety policy: it clamps the requested cap
96    /// into <code>[[MIN_FRAME_BYTES], [MAX_FRAME_BYTES_HARD_CAP]]</code>
97    /// before sending. The child installs the value as-is.
98    ConfigureFrameLimit {
99        /// Per-frame byte cap applied in both directions for this connection.
100        max_frame_bytes: u32,
101    },
102    /// Parent → child request frame.
103    Request(Request),
104    /// Child → parent terminal response for one request.
105    Response(Response),
106    /// Child → parent intermediate diagnostic frame.
107    Diagnostic(Diagnostic),
108    /// Child → parent intermediate progress frame.
109    ProgressTick(ProgressTick),
110    /// Child → parent streaming data row frame.
111    DataRow(DataRow),
112    /// Child → parent fatal exit notification carrying the captured stderr
113    /// just before the child process tears down.
114    FatalExit(FatalExit),
115}
116
117/// Parent-issued worker request body.
118#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
119#[serde(tag = "op", rename_all = "snake_case")]
120#[non_exhaustive]
121pub enum Request {
122    Health,
123    LoadFixtureCapability {
124        manifest_path: String,
125    },
126    CallFixtureMul {
127        manifest_path: String,
128        lhs: u64,
129        rhs: u64,
130    },
131    TriggerLeanPanic {
132        manifest_path: String,
133    },
134    OpenHostSession {
135        project_root: String,
136        mode: HostSessionMode,
137        imports: Vec<String>,
138    },
139    Elaborate {
140        source: String,
141        options: LeanWorkerElabOptions,
142    },
143    KernelCheck {
144        source: String,
145        options: LeanWorkerElabOptions,
146        progress: bool,
147    },
148    DeclarationKinds {
149        names: Vec<String>,
150        progress: bool,
151    },
152    DeclarationNames {
153        names: Vec<String>,
154        progress: bool,
155    },
156    RunDataStream {
157        export: String,
158        request_json: String,
159        progress: bool,
160    },
161    CapabilityMetadata {
162        export: String,
163        request_json: String,
164    },
165    CapabilityDoctor {
166        export: String,
167        request_json: String,
168    },
169    JsonCommand {
170        export: String,
171        request_json: String,
172    },
173    InferType {
174        source: String,
175        options: LeanWorkerElabOptions,
176    },
177    Whnf {
178        source: String,
179        options: LeanWorkerElabOptions,
180    },
181    IsDefEq {
182        lhs: String,
183        rhs: String,
184        transparency: LeanWorkerMetaTransparency,
185        options: LeanWorkerElabOptions,
186    },
187    Describe {
188        name: String,
189    },
190    SearchDeclarations {
191        search: LeanWorkerDeclarationSearch,
192    },
193    DeclarationType {
194        name: String,
195        max_bytes: usize,
196    },
197    InspectDeclaration {
198        request: LeanWorkerDeclarationInspectionRequest,
199    },
200    AttemptProof {
201        request: LeanWorkerProofAttemptRequest,
202        options: LeanWorkerElabOptions,
203        progress: bool,
204    },
205    VerifyDeclaration {
206        request: LeanWorkerDeclarationVerificationRequest,
207        options: LeanWorkerElabOptions,
208        progress: bool,
209    },
210    ListDeclarationsStrings {
211        filter: LeanWorkerDeclarationFilter,
212        progress: bool,
213    },
214    DescribeBulk {
215        names: Vec<String>,
216        progress: bool,
217    },
218    ProcessModuleQuery {
219        source: String,
220        query: LeanWorkerModuleQuery,
221        options: LeanWorkerElabOptions,
222    },
223    ProcessModuleQueryBatch {
224        source: String,
225        selectors: Vec<LeanWorkerModuleQuerySelector>,
226        budgets: LeanWorkerOutputBudgets,
227        options: LeanWorkerElabOptions,
228    },
229    ClearModuleSnapshotCache,
230    // Private harness requests that exercise streaming frame behavior.
231    // Not part of the public row sink API.
232    EmitTestRows {
233        streams: Vec<String>,
234    },
235    EmitTestRowsThenExit,
236    EmitTestRowsThenPanic,
237    Terminate,
238}
239
240/// How the worker child should load host-session capabilities.
241#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
242#[serde(tag = "kind", rename_all = "snake_case")]
243#[non_exhaustive]
244pub enum HostSessionMode {
245    /// Open a user capability dylib and the bundled host shims.
246    Capability {
247        package: String,
248        lib_name: String,
249        manifest_path: Option<String>,
250    },
251    /// Open only the bundled host shims.
252    ShimsOnly,
253}
254
255/// Child-issued terminal response body for one [`Request`].
256#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
257#[serde(tag = "status", rename_all = "snake_case")]
258#[non_exhaustive]
259pub enum Response {
260    HealthOk,
261    CapabilityLoaded,
262    U64 {
263        value: u64,
264    },
265    HostSessionOpened,
266    Elaboration {
267        outcome: LeanWorkerElabResult,
268    },
269    KernelCheck {
270        outcome: LeanWorkerKernelResult,
271    },
272    Strings {
273        values: Vec<String>,
274    },
275    StreamComplete {
276        summary: StreamSummary,
277    },
278    StreamExportFailed {
279        status_byte: u8,
280    },
281    StreamCallbackFailed {
282        status_byte: u8,
283        description: String,
284    },
285    StreamRowMalformed {
286        message: String,
287    },
288    CapabilityMetadata {
289        metadata: LeanWorkerCapabilityMetadata,
290    },
291    CapabilityDoctor {
292        report: LeanWorkerDoctorReport,
293    },
294    CapabilityMetadataMalformed {
295        message: String,
296    },
297    CapabilityDoctorMalformed {
298        message: String,
299    },
300    JsonCommand {
301        response_json: String,
302    },
303    MetaExpr {
304        result: LeanWorkerMetaResult<LeanWorkerRendered>,
305    },
306    MetaBool {
307        result: LeanWorkerMetaResult<bool>,
308    },
309    Declaration {
310        row: Option<LeanWorkerDeclarationRow>,
311    },
312    DeclarationSearch {
313        result: LeanWorkerDeclarationSearchResult,
314    },
315    DeclarationType {
316        row: Option<LeanWorkerDeclarationType>,
317    },
318    DeclarationInspection {
319        result: LeanWorkerDeclarationInspectionResult,
320    },
321    ProofAttempt {
322        result: LeanWorkerProofAttemptResult,
323    },
324    DeclarationVerification {
325        result: LeanWorkerDeclarationVerificationResult,
326    },
327    DeclarationBulk {
328        rows: Vec<LeanWorkerDeclarationRow>,
329    },
330    ProcessModuleQuery {
331        outcome: LeanWorkerModuleQueryOutcome,
332    },
333    ProcessModuleQueryBatch {
334        outcome: LeanWorkerModuleQueryBatchOutcome,
335    },
336    ModuleSnapshotCacheCleared {
337        result: crate::types::LeanWorkerModuleSnapshotCacheClearResult,
338    },
339    RowsComplete {
340        count: u64,
341    },
342    Terminating,
343    Error {
344        code: String,
345        message: String,
346    },
347}
348
349/// Intermediate diagnostic frame emitted by the child during a request.
350#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
351#[non_exhaustive]
352pub struct Diagnostic {
353    /// Stable diagnostic code identifier.
354    pub code: String,
355    /// Bounded human-readable diagnostic message.
356    pub message: String,
357}
358
359impl Diagnostic {
360    /// Build a diagnostic frame payload.
361    #[must_use]
362    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
363        Self {
364            code: code.into(),
365            message: message.into(),
366        }
367    }
368}
369
370/// Intermediate progress frame emitted by the child during a long-running
371/// request.
372#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
373#[non_exhaustive]
374pub struct ProgressTick {
375    /// Phase name the child is reporting progress for.
376    pub phase: String,
377    /// Items completed so far in this phase.
378    pub current: u64,
379    /// Total expected items in this phase, if known.
380    pub total: Option<u64>,
381}
382
383impl ProgressTick {
384    /// Build a progress-tick frame payload.
385    #[must_use]
386    pub fn new(phase: impl Into<String>, current: u64, total: Option<u64>) -> Self {
387        Self {
388            phase: phase.into(),
389            current,
390            total,
391        }
392    }
393}
394
395/// One row in a streaming response.
396///
397/// Construction goes through [`DataRowEmitter::next`] in the child runtime;
398/// direct struct-literal construction is permitted in tests and harnesses.
399/// This struct intentionally stays exhaustive: see the module-level note on
400/// additive evolution.
401#[derive(Clone, Debug, Deserialize, Serialize)]
402pub struct DataRow {
403    /// Logical stream this row belongs to.
404    pub stream: String,
405    /// Per-stream monotonically increasing sequence number.
406    pub sequence: u64,
407    /// Opaque JSON payload (deserialised lazily by the parent).
408    pub payload: Box<RawValue>,
409}
410
411impl PartialEq for DataRow {
412    fn eq(&self, other: &Self) -> bool {
413        self.stream == other.stream && self.sequence == other.sequence && self.payload.get() == other.payload.get()
414    }
415}
416
417impl Eq for DataRow {}
418
419/// Terminal stream-completion summary returned alongside a streaming response.
420#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
421#[non_exhaustive]
422pub struct StreamSummary {
423    /// Total rows emitted across all streams.
424    pub total_rows: u64,
425    /// Per-stream row counts at completion.
426    pub per_stream_counts: BTreeMap<String, u64>,
427    /// Child-side elapsed time in microseconds.
428    pub elapsed_micros: u64,
429    /// Optional downstream-defined terminal metadata.
430    pub metadata: Option<Value>,
431}
432
433impl StreamSummary {
434    /// Build a stream-completion summary, clamping the elapsed duration into
435    /// the `u64` micros field.
436    #[must_use]
437    pub fn new(
438        total_rows: u64,
439        per_stream_counts: BTreeMap<String, u64>,
440        elapsed: Duration,
441        metadata: Option<Value>,
442    ) -> Self {
443        Self {
444            total_rows,
445            per_stream_counts,
446            elapsed_micros: elapsed.as_micros().try_into().unwrap_or(u64::MAX),
447            metadata,
448        }
449    }
450}
451
452/// Stateful emitter that assigns per-stream sequence numbers and tracks the
453/// running row count for the terminal [`StreamSummary`].
454#[derive(Debug, Default)]
455#[non_exhaustive]
456pub struct DataRowEmitter {
457    sequences: BTreeMap<String, u64>,
458    count: u64,
459}
460
461impl DataRowEmitter {
462    /// Allocate the next [`DataRow`] for `stream`, advancing the per-stream
463    /// sequence and the overall count.
464    pub fn next(&mut self, stream: impl Into<String>, payload: Box<RawValue>) -> DataRow {
465        let stream = stream.into();
466        let sequence = self.sequences.entry(stream.clone()).or_insert(0);
467        let row = DataRow {
468            stream,
469            sequence: *sequence,
470            payload,
471        };
472        *sequence = sequence.saturating_add(1);
473        self.count = self.count.saturating_add(1);
474        row
475    }
476
477    #[cfg(test)]
478    fn emit(
479        &mut self,
480        writer: &mut impl Write,
481        stream: impl Into<String>,
482        payload: &Value,
483    ) -> Result<(), ProtocolError> {
484        let row = self.next(stream, serde_json::value::to_raw_value(payload)?);
485        write_frame(writer, Message::DataRow(row), MAX_FRAME_BYTES)
486    }
487
488    /// Total rows emitted across all streams.
489    #[must_use]
490    pub fn count(&self) -> u64 {
491        self.count
492    }
493
494    /// Snapshot of per-stream row counts.
495    #[must_use]
496    pub fn per_stream_counts(&self) -> BTreeMap<String, u64> {
497        self.sequences.clone()
498    }
499}
500
501/// Final frame the child writes before it tears down on an unrecoverable
502/// failure.
503#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
504#[non_exhaustive]
505pub struct FatalExit {
506    /// Stringified `ExitStatus` of the child process.
507    pub status: String,
508    /// Captured stderr tail at fatal-exit time.
509    pub stderr: String,
510}
511
512impl FatalExit {
513    /// Build a fatal-exit frame payload.
514    #[must_use]
515    pub fn new(status: impl Into<String>, stderr: impl Into<String>) -> Self {
516        Self {
517            status: status.into(),
518            stderr: stderr.into(),
519        }
520    }
521}
522
523/// Failure modes the codec can produce while reading or writing a frame.
524#[derive(Debug)]
525#[non_exhaustive]
526pub enum ProtocolError {
527    /// Underlying I/O failure (pipe closed, partial read, etc.).
528    Io(io::Error),
529    /// JSON serialisation or deserialisation failure.
530    Json(serde_json::Error),
531    /// A frame body exceeded [`MAX_FRAME_BYTES`].
532    FrameTooLarge {
533        /// Observed frame size in bytes.
534        len: u32,
535        /// Maximum allowed frame size.
536        max: u32,
537    },
538    /// Peer's frame version did not match this binary's [`PROTOCOL_VERSION`].
539    VersionMismatch {
540        /// Version this binary expected.
541        expected: u16,
542        /// Version the peer used.
543        actual: u16,
544    },
545}
546
547impl ProtocolError {
548    /// Whether the underlying I/O error indicates the peer's pipe was closed
549    /// (`UnexpectedEof`). Used by callers to distinguish a clean fatal exit
550    /// from a true protocol failure.
551    #[must_use]
552    pub fn is_eof(&self) -> bool {
553        matches!(self, Self::Io(err) if err.kind() == io::ErrorKind::UnexpectedEof)
554    }
555}
556
557impl fmt::Display for ProtocolError {
558    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
559        match self {
560            Self::Io(err) => write!(f, "worker protocol I/O failed: {err}"),
561            Self::Json(err) => write!(f, "worker protocol JSON decode failed: {err}"),
562            Self::FrameTooLarge { len, max } => {
563                write!(f, "worker protocol frame too large: {len} bytes exceeds {max}")
564            }
565            Self::VersionMismatch { expected, actual } => {
566                write!(
567                    f,
568                    "worker protocol version mismatch: expected {expected}, received {actual}"
569                )
570            }
571        }
572    }
573}
574
575impl std::error::Error for ProtocolError {}
576
577impl From<io::Error> for ProtocolError {
578    fn from(value: io::Error) -> Self {
579        Self::Io(value)
580    }
581}
582
583impl From<serde_json::Error> for ProtocolError {
584    fn from(value: serde_json::Error) -> Self {
585        Self::Json(value)
586    }
587}
588
589/// Serialise `message` as a length-delimited JSON frame to `writer`.
590///
591/// `max_frame_bytes` is the per-frame cap negotiated for this connection.
592/// Until the handshake completes, callers pass [`MAX_FRAME_BYTES`] as the
593/// default; afterwards the supervisor passes the
594/// [`Message::ConfigureFrameLimit`] value installed on the connection.
595///
596/// # Errors
597///
598/// Returns [`ProtocolError::FrameTooLarge`] if the serialised body would
599/// exceed `max_frame_bytes`, or the underlying [`ProtocolError::Io`] /
600/// [`ProtocolError::Json`] for codec failures.
601pub fn write_frame(writer: &mut impl Write, message: Message, max_frame_bytes: u32) -> Result<(), ProtocolError> {
602    let bytes = serde_json::to_vec(&Frame::new(message))?;
603    let len = u32::try_from(bytes.len()).map_err(|_| ProtocolError::FrameTooLarge {
604        len: u32::MAX,
605        max: max_frame_bytes,
606    })?;
607    if len > max_frame_bytes {
608        return Err(ProtocolError::FrameTooLarge {
609            len,
610            max: max_frame_bytes,
611        });
612    }
613    writer.write_all(&len.to_be_bytes())?;
614    writer.write_all(&bytes)?;
615    writer.flush()?;
616    Ok(())
617}
618
619/// Read one length-delimited JSON frame from `reader`.
620///
621/// `max_frame_bytes` is the per-frame cap negotiated for this connection. See
622/// [`write_frame`] for the back-compat default and post-handshake semantics.
623///
624/// # Errors
625///
626/// Returns [`ProtocolError::FrameTooLarge`] if the framed length exceeds
627/// `max_frame_bytes` (rejected before allocation),
628/// [`ProtocolError::VersionMismatch`] if the peer's version does not match
629/// [`PROTOCOL_VERSION`], or the underlying [`ProtocolError::Io`] /
630/// [`ProtocolError::Json`] for codec failures.
631pub fn read_frame(reader: &mut impl Read, max_frame_bytes: u32) -> Result<Frame, ProtocolError> {
632    let mut len_bytes = [0_u8; 4];
633    reader.read_exact(&mut len_bytes)?;
634    let len = u32::from_be_bytes(len_bytes);
635    if len > max_frame_bytes {
636        return Err(ProtocolError::FrameTooLarge {
637            len,
638            max: max_frame_bytes,
639        });
640    }
641    let mut bytes = vec![0_u8; len as usize];
642    reader.read_exact(&mut bytes)?;
643    let frame: Frame = serde_json::from_slice(&bytes)?;
644    if frame.version != PROTOCOL_VERSION {
645        return Err(ProtocolError::VersionMismatch {
646            expected: PROTOCOL_VERSION,
647            actual: frame.version,
648        });
649    }
650    Ok(frame)
651}
652
653#[cfg(test)]
654mod tests {
655    #![allow(clippy::expect_used, clippy::panic)]
656
657    use std::io::Cursor;
658
659    use serde_json::json;
660    use serde_json::value::RawValue;
661
662    use super::{
663        DataRow, DataRowEmitter, MAX_FRAME_BYTES, MAX_FRAME_BYTES_HARD_CAP, MIN_FRAME_BYTES, Message, ProtocolError,
664        Request, Response, read_frame, write_frame,
665    };
666    use crate::types::{
667        LeanWorkerDeclarationFilter, LeanWorkerDeclarationFlags, LeanWorkerDeclarationInspection,
668        LeanWorkerDeclarationInspectionFields, LeanWorkerDeclarationInspectionRequest,
669        LeanWorkerDeclarationInspectionResult, LeanWorkerDeclarationNameMatch, LeanWorkerDeclarationProofSearchFacts,
670        LeanWorkerDeclarationSearch, LeanWorkerDeclarationSearchBias, LeanWorkerDeclarationSearchFacts,
671        LeanWorkerDeclarationSearchPruning, LeanWorkerDeclarationSearchResult, LeanWorkerDeclarationSearchRow,
672        LeanWorkerDeclarationSearchScope, LeanWorkerDeclarationSearchTimings, LeanWorkerDeclarationTargetInfo,
673        LeanWorkerDeclarationVerificationFacts, LeanWorkerDeclarationVerificationRequest,
674        LeanWorkerDeclarationVerificationResult, LeanWorkerDeclarationVerificationStatus,
675        LeanWorkerDeclarationVerificationTarget, LeanWorkerElabFailure, LeanWorkerElabOptions,
676        LeanWorkerModuleCacheStatus, LeanWorkerModuleQuery, LeanWorkerModuleQueryBatchEnvelope,
677        LeanWorkerModuleQueryBatchItem, LeanWorkerModuleQueryBatchOutcome, LeanWorkerModuleQueryBatchResult,
678        LeanWorkerModuleQueryCacheFacts, LeanWorkerModuleQueryOutcome, LeanWorkerModuleQueryResult,
679        LeanWorkerModuleQuerySelector, LeanWorkerModuleQueryTimings, LeanWorkerModuleSourceSpan,
680        LeanWorkerOutputBudgets, LeanWorkerProofAttemptEnvelope, LeanWorkerProofAttemptRequest,
681        LeanWorkerProofAttemptResult, LeanWorkerProofAttemptRow, LeanWorkerProofAttemptStatus,
682        LeanWorkerProofCandidate, LeanWorkerProofEditTarget, LeanWorkerProofPositionSelector,
683        LeanWorkerProofPositionSummary, LeanWorkerRenderedInfo, LeanWorkerSorryPolicy, LeanWorkerSourceRange,
684        LeanWorkerTypeAtResult,
685    };
686
687    fn raw_json(value: &serde_json::Value) -> Box<RawValue> {
688        serde_json::value::to_raw_value(value).expect("test JSON converts to raw value")
689    }
690
691    #[test]
692    fn data_row_round_trips_through_length_delimited_frame() {
693        let row = DataRow {
694            stream: "rows".to_owned(),
695            sequence: 7,
696            payload: raw_json(&json!({ "name": "Nat.add", "score": 3 })),
697        };
698        let mut bytes = Vec::new();
699        write_frame(&mut bytes, Message::DataRow(row.clone()), MAX_FRAME_BYTES).expect("data row writes");
700        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("data row reads");
701        assert_eq!(frame.message, Message::DataRow(row));
702    }
703
704    #[test]
705    fn data_row_emitter_assigns_per_stream_sequences() {
706        let mut emitter = DataRowEmitter::default();
707        let mut bytes = Vec::new();
708        emitter
709            .emit(&mut bytes, "rows", &json!({ "i": 0 }))
710            .expect("first row writes");
711        emitter
712            .emit(&mut bytes, "warnings", &json!({ "i": 1 }))
713            .expect("second row writes");
714        emitter
715            .emit(&mut bytes, "rows", &json!({ "i": 2 }))
716            .expect("third row writes");
717        assert_eq!(emitter.count(), 3);
718
719        let mut cursor = Cursor::new(bytes);
720        let rows = [
721            read_frame(&mut cursor, MAX_FRAME_BYTES).expect("first row reads"),
722            read_frame(&mut cursor, MAX_FRAME_BYTES).expect("second row reads"),
723            read_frame(&mut cursor, MAX_FRAME_BYTES).expect("third row reads"),
724        ];
725        assert_eq!(
726            rows.map(|frame| frame.message),
727            [
728                Message::DataRow(DataRow {
729                    stream: "rows".to_owned(),
730                    sequence: 0,
731                    payload: raw_json(&json!({ "i": 0 })),
732                }),
733                Message::DataRow(DataRow {
734                    stream: "warnings".to_owned(),
735                    sequence: 0,
736                    payload: raw_json(&json!({ "i": 1 })),
737                }),
738                Message::DataRow(DataRow {
739                    stream: "rows".to_owned(),
740                    sequence: 1,
741                    payload: raw_json(&json!({ "i": 2 })),
742                }),
743            ],
744        );
745    }
746
747    #[test]
748    fn oversized_data_row_is_rejected_before_write() {
749        let row = DataRow {
750            stream: "rows".to_owned(),
751            sequence: 0,
752            payload: raw_json(&json!({ "blob": "x".repeat(MAX_FRAME_BYTES as usize) })),
753        };
754        let mut bytes = Vec::new();
755        let err =
756            write_frame(&mut bytes, Message::DataRow(row), MAX_FRAME_BYTES).expect_err("oversized frame is rejected");
757        match err {
758            ProtocolError::FrameTooLarge { len, max } => {
759                assert!(len > max);
760                assert_eq!(max, MAX_FRAME_BYTES);
761            }
762            other @ (ProtocolError::Io(_) | ProtocolError::Json(_) | ProtocolError::VersionMismatch { .. }) => {
763                panic!("expected FrameTooLarge, got {other:?}");
764            }
765        }
766    }
767
768    #[test]
769    fn oversized_data_row_is_rejected_before_read_allocation() {
770        let mut bytes = Vec::new();
771        bytes.extend_from_slice(&(MAX_FRAME_BYTES.saturating_add(1)).to_be_bytes());
772        let err = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect_err("oversized frame is rejected");
773        match err {
774            ProtocolError::FrameTooLarge { len, max } => {
775                assert_eq!(len, MAX_FRAME_BYTES + 1);
776                assert_eq!(max, MAX_FRAME_BYTES);
777            }
778            other @ (ProtocolError::Io(_) | ProtocolError::Json(_) | ProtocolError::VersionMismatch { .. }) => {
779                panic!("expected FrameTooLarge, got {other:?}");
780            }
781        }
782    }
783
784    #[test]
785    fn larger_cap_accepts_frame_rejected_under_default() {
786        // A 2 MiB payload is rejected under MAX_FRAME_BYTES (1 MiB) but
787        // accepted when the cap is raised—proving the cap parameter is the
788        // only thing the codec consults.
789        let raised = MAX_FRAME_BYTES.saturating_mul(8);
790        let row = DataRow {
791            stream: "rows".to_owned(),
792            sequence: 0,
793            payload: raw_json(&json!({ "blob": "x".repeat(2 * MAX_FRAME_BYTES as usize) })),
794        };
795        let mut buf = Vec::new();
796        write_frame(&mut buf, Message::DataRow(row.clone()), raised).expect("oversize-under-default frame writes");
797        let frame = read_frame(&mut Cursor::new(buf), raised).expect("oversize-under-default frame reads");
798        assert_eq!(frame.message, Message::DataRow(row));
799    }
800
801    #[test]
802    fn frame_cap_bounds_constants_are_consistent() {
803        const { assert!(MIN_FRAME_BYTES <= MAX_FRAME_BYTES) };
804        const { assert!(MAX_FRAME_BYTES <= MAX_FRAME_BYTES_HARD_CAP) };
805    }
806
807    #[test]
808    fn malformed_frame_payload_is_protocol_error() {
809        let mut bytes = Vec::new();
810        bytes.extend_from_slice(&1_u32.to_be_bytes());
811        bytes.push(b'{');
812        let err = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect_err("malformed JSON is rejected");
813        match err {
814            ProtocolError::Json(_) => {}
815            other @ (ProtocolError::Io(_)
816            | ProtocolError::FrameTooLarge { .. }
817            | ProtocolError::VersionMismatch { .. }) => {
818                panic!("expected Json error, got {other:?}");
819            }
820        }
821    }
822
823    #[test]
824    fn rows_complete_response_round_trips() {
825        let mut bytes = Vec::new();
826        write_frame(
827            &mut bytes,
828            Message::Response(Response::RowsComplete { count: 2 }),
829            MAX_FRAME_BYTES,
830        )
831        .expect("rows complete writes");
832        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("rows complete reads");
833        assert_eq!(frame.message, Message::Response(Response::RowsComplete { count: 2 }));
834    }
835
836    #[test]
837    fn declaration_search_request_and_response_round_trip() {
838        let request = Message::Request(Request::SearchDeclarations {
839            search: LeanWorkerDeclarationSearch {
840                name_fragment: Some("map".to_owned()),
841                name_match: LeanWorkerDeclarationNameMatch::Suffix,
842                kind: Some("theorem".to_owned()),
843                required_constants: vec!["List.map".to_owned()],
844                conclusion_head: Some("Eq".to_owned()),
845                scope_biases: vec![LeanWorkerDeclarationSearchBias {
846                    scope: LeanWorkerDeclarationSearchScope::Namespace,
847                    prefix: "List".to_owned(),
848                    strict: false,
849                    weight: 7,
850                }],
851                limit: 3,
852                filter: LeanWorkerDeclarationFilter {
853                    include_private: false,
854                    include_generated: false,
855                    include_internal: false,
856                },
857                include_source: false,
858            },
859        });
860        let mut bytes = Vec::new();
861        write_frame(&mut bytes, request.clone(), MAX_FRAME_BYTES).expect("declaration search request writes");
862        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("declaration search request reads");
863        assert_eq!(frame.message, request);
864
865        let response = Message::Response(Response::DeclarationSearch {
866            result: LeanWorkerDeclarationSearchResult {
867                declarations: vec![LeanWorkerDeclarationSearchRow {
868                    name: "List.map_map".to_owned(),
869                    kind: "theorem".to_owned(),
870                    module: Some("Init.Data.List.Lemmas".to_owned()),
871                    source: None,
872                    match_reason: "name,kind,required_constants,conclusion_head".to_owned(),
873                    score: 127,
874                    rank: 1,
875                    flags: LeanWorkerDeclarationFlags::default(),
876                }],
877                truncated: true,
878                facts: LeanWorkerDeclarationSearchFacts {
879                    declarations_scanned: 100,
880                    after_name_filter: 10,
881                    after_kind_filter: 8,
882                    after_required_constants_filter: 4,
883                    after_conclusion_filter: 2,
884                    after_scope_filter: 2,
885                    source_lookups: 0,
886                    broad_pruning: vec![LeanWorkerDeclarationSearchPruning {
887                        stage: "limit".to_owned(),
888                        reason: "broad_search_limit".to_owned(),
889                        count: 1,
890                    }],
891                    truncated: true,
892                    timings: LeanWorkerDeclarationSearchTimings {
893                        scan_micros: 1000,
894                        rank_micros: 50,
895                        source_micros: 0,
896                    },
897                },
898            },
899        });
900        let mut bytes = Vec::new();
901        write_frame(&mut bytes, response.clone(), MAX_FRAME_BYTES).expect("declaration search response writes");
902        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("declaration search response reads");
903        assert_eq!(frame.message, response);
904    }
905
906    #[test]
907    fn declaration_inspection_request_and_response_round_trip() {
908        let request = Message::Request(Request::InspectDeclaration {
909            request: LeanWorkerDeclarationInspectionRequest {
910                name: "List.map_map".to_owned(),
911                fields: LeanWorkerDeclarationInspectionFields {
912                    source: true,
913                    statement: true,
914                    docstring: true,
915                    attributes: true,
916                    flags: true,
917                },
918                budgets: LeanWorkerOutputBudgets {
919                    per_field_bytes: 128,
920                    total_bytes: 512,
921                },
922            },
923        });
924        let mut bytes = Vec::new();
925        write_frame(&mut bytes, request.clone(), MAX_FRAME_BYTES).expect("declaration inspection request writes");
926        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("declaration inspection request reads");
927        assert_eq!(frame.message, request);
928
929        let response = Message::Response(Response::DeclarationInspection {
930            result: LeanWorkerDeclarationInspectionResult::Found {
931                declaration: Box::new(LeanWorkerDeclarationInspection {
932                    name: "List.map_map".to_owned(),
933                    kind: "theorem".to_owned(),
934                    module: Some("Init.Data.List.Lemmas".to_owned()),
935                    source: Some(LeanWorkerSourceRange {
936                        file: "Init/Data/List/Lemmas.lean".to_owned(),
937                        start_line: 1,
938                        start_column: 1,
939                        end_line: 1,
940                        end_column: 10,
941                    }),
942                    statement: Some(LeanWorkerRenderedInfo {
943                        value: "forall ...".to_owned(),
944                        truncated: true,
945                    }),
946                    docstring: Some(LeanWorkerRenderedInfo {
947                        value: "doc".to_owned(),
948                        truncated: false,
949                    }),
950                    attributes: vec!["simp".to_owned(), "rw".to_owned()],
951                    proof_search: LeanWorkerDeclarationProofSearchFacts {
952                        is_simp: true,
953                        is_rw_candidate: true,
954                        is_instance: false,
955                        is_class: false,
956                        class_name: None,
957                    },
958                    flags: LeanWorkerDeclarationFlags::default(),
959                }),
960            },
961        });
962        let mut bytes = Vec::new();
963        write_frame(&mut bytes, response.clone(), MAX_FRAME_BYTES).expect("declaration inspection response writes");
964        let frame =
965            read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("declaration inspection response reads");
966        assert_eq!(frame.message, response);
967
968        let not_found = Message::Response(Response::DeclarationInspection {
969            result: LeanWorkerDeclarationInspectionResult::NotFound {
970                name: "Missing.name".to_owned(),
971            },
972        });
973        let mut bytes = Vec::new();
974        write_frame(&mut bytes, not_found.clone(), MAX_FRAME_BYTES)
975            .expect("declaration inspection not-found response writes");
976        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES)
977            .expect("declaration inspection not-found response reads");
978        assert_eq!(frame.message, not_found);
979
980        let unsupported = Message::Response(Response::DeclarationInspection {
981            result: LeanWorkerDeclarationInspectionResult::Unsupported,
982        });
983        let mut bytes = Vec::new();
984        write_frame(&mut bytes, unsupported.clone(), MAX_FRAME_BYTES)
985            .expect("declaration inspection unsupported response writes");
986        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES)
987            .expect("declaration inspection unsupported response reads");
988        assert_eq!(frame.message, unsupported);
989    }
990
991    #[test]
992    fn module_query_request_and_response_round_trip() {
993        let request = Message::Request(Request::ProcessModuleQuery {
994            source: "def x := 1\n#check x\n".to_owned(),
995            query: LeanWorkerModuleQuery::TypeAt { line: 2, column: 8 },
996            options: LeanWorkerElabOptions::default(),
997        });
998        let mut bytes = Vec::new();
999        write_frame(&mut bytes, request.clone(), MAX_FRAME_BYTES).expect("module query request writes");
1000        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("module query request reads");
1001        assert_eq!(frame.message, request);
1002
1003        let response = Message::Response(Response::ProcessModuleQuery {
1004            outcome: LeanWorkerModuleQueryOutcome::Ok {
1005                imports: Vec::new(),
1006                result: LeanWorkerModuleQueryResult::TypeAt(LeanWorkerTypeAtResult::Term {
1007                    span: LeanWorkerModuleSourceSpan {
1008                        start_line: 2,
1009                        start_column: 8,
1010                        end_line: 2,
1011                        end_column: 9,
1012                    },
1013                    expr: LeanWorkerRenderedInfo {
1014                        value: "x".to_owned(),
1015                        truncated: false,
1016                    },
1017                    type_str: LeanWorkerRenderedInfo {
1018                        value: "Nat".to_owned(),
1019                        truncated: false,
1020                    },
1021                    expected_type: None,
1022                }),
1023            },
1024        });
1025        let mut bytes = Vec::new();
1026        write_frame(&mut bytes, response.clone(), MAX_FRAME_BYTES).expect("module query response writes");
1027        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("module query response reads");
1028        assert_eq!(frame.message, response);
1029
1030        let unsupported = LeanWorkerModuleQueryOutcome::Unsupported;
1031        let json = serde_json::to_value(&unsupported).expect("unsupported serializes");
1032        assert_eq!(json, json!({ "status": "unsupported" }));
1033
1034        let diagnostics = LeanWorkerModuleQueryResult::Diagnostics(LeanWorkerElabFailure {
1035            diagnostics: Vec::new(),
1036            truncated: false,
1037        });
1038        let json = serde_json::to_value(&diagnostics).expect("diagnostics serializes");
1039        assert_eq!(
1040            json,
1041            json!({
1042                "result": "diagnostics",
1043                "body": {
1044                    "diagnostics": [],
1045                    "truncated": false
1046                }
1047            })
1048        );
1049    }
1050
1051    #[test]
1052    fn module_query_batch_request_and_response_round_trip() {
1053        let request = Message::Request(Request::ProcessModuleQueryBatch {
1054            source: "theorem t : True := by\n  trivial\n".to_owned(),
1055            selectors: vec![
1056                LeanWorkerModuleQuerySelector::Diagnostics {
1057                    id: "diagnostics".to_owned(),
1058                },
1059                LeanWorkerModuleQuerySelector::ProofState {
1060                    id: "state".to_owned(),
1061                    line: 2,
1062                    column: 4,
1063                },
1064            ],
1065            budgets: LeanWorkerOutputBudgets::default(),
1066            options: LeanWorkerElabOptions::default(),
1067        });
1068        let mut bytes = Vec::new();
1069        write_frame(&mut bytes, request.clone(), MAX_FRAME_BYTES).expect("module query batch request writes");
1070        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("module query batch request reads");
1071        assert_eq!(frame.message, request);
1072
1073        let response = Message::Response(Response::ProcessModuleQueryBatch {
1074            outcome: LeanWorkerModuleQueryBatchOutcome::Ok {
1075                imports: Vec::new(),
1076                result: LeanWorkerModuleQueryBatchEnvelope {
1077                    items: vec![LeanWorkerModuleQueryBatchItem::Ok {
1078                        id: "diagnostics".to_owned(),
1079                        result: Box::new(LeanWorkerModuleQueryBatchResult::Diagnostics(LeanWorkerElabFailure {
1080                            diagnostics: Vec::new(),
1081                            truncated: false,
1082                        })),
1083                    }],
1084                    total_truncated: false,
1085                },
1086                facts: LeanWorkerModuleQueryCacheFacts {
1087                    cache_status: LeanWorkerModuleCacheStatus::Miss,
1088                    timings: LeanWorkerModuleQueryTimings::zero(),
1089                    output_bytes: 0,
1090                    cache_entry_count: Some(1),
1091                    cache_approx_bytes: Some(1024),
1092                },
1093            },
1094        });
1095        let mut bytes = Vec::new();
1096        write_frame(&mut bytes, response.clone(), MAX_FRAME_BYTES).expect("module query batch response writes");
1097        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("module query batch response reads");
1098        assert_eq!(frame.message, response);
1099    }
1100
1101    #[test]
1102    fn proof_attempt_request_and_response_round_trip() {
1103        let span = LeanWorkerModuleSourceSpan {
1104            start_line: 1,
1105            start_column: 22,
1106            end_line: 2,
1107            end_column: 7,
1108        };
1109        let request = Message::Request(Request::AttemptProof {
1110            request: LeanWorkerProofAttemptRequest {
1111                source: "theorem t : True := by\n  trivial\n".to_owned(),
1112                edit: LeanWorkerProofEditTarget::Declaration {
1113                    name: "t".to_owned(),
1114                    position: LeanWorkerProofPositionSelector::Default,
1115                },
1116                candidates: vec![LeanWorkerProofCandidate {
1117                    id: "rfl".to_owned(),
1118                    text: "by trivial".to_owned(),
1119                }],
1120                budgets: LeanWorkerOutputBudgets::default(),
1121            },
1122            options: LeanWorkerElabOptions::default(),
1123            progress: true,
1124        });
1125        let mut bytes = Vec::new();
1126        write_frame(&mut bytes, request.clone(), MAX_FRAME_BYTES).expect("proof attempt request writes");
1127        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("proof attempt request reads");
1128        assert_eq!(frame.message, request);
1129
1130        let response = Message::Response(Response::ProofAttempt {
1131            result: LeanWorkerProofAttemptResult::Ok {
1132                imports: Vec::new(),
1133                result: LeanWorkerProofAttemptEnvelope {
1134                    candidates: vec![LeanWorkerProofAttemptRow {
1135                        id: "rfl".to_owned(),
1136                        status: LeanWorkerProofAttemptStatus::Closed,
1137                        candidate_text: LeanWorkerRenderedInfo {
1138                            value: "rfl".to_owned(),
1139                            truncated: false,
1140                        },
1141                        diagnostics: LeanWorkerElabFailure {
1142                            diagnostics: Vec::new(),
1143                            truncated: false,
1144                        },
1145                        downstream_diagnostics: LeanWorkerElabFailure {
1146                            diagnostics: Vec::new(),
1147                            truncated: false,
1148                        },
1149                        goals: Vec::new(),
1150                        declaration: Some(LeanWorkerDeclarationTargetInfo {
1151                            short_name: "t".to_owned(),
1152                            declaration_name: "t".to_owned(),
1153                            namespace_name: String::new(),
1154                            declaration_kind: "theorem".to_owned(),
1155                            declaration_span: span.clone(),
1156                            name_span: span.clone(),
1157                            body_span: span,
1158                        }),
1159                        proof_position: Some(LeanWorkerProofPositionSummary {
1160                            index: 0,
1161                            tactic: LeanWorkerRenderedInfo {
1162                                value: "trivial".to_owned(),
1163                                truncated: false,
1164                            },
1165                        }),
1166                        output_truncated: false,
1167                    }],
1168                    candidate_limit: 8,
1169                    candidates_truncated: false,
1170                },
1171            },
1172        });
1173        let mut bytes = Vec::new();
1174        write_frame(&mut bytes, response.clone(), MAX_FRAME_BYTES).expect("proof attempt response writes");
1175        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("proof attempt response reads");
1176        assert_eq!(frame.message, response);
1177    }
1178
1179    #[test]
1180    fn declaration_verification_request_and_response_round_trip() {
1181        let request = Message::Request(Request::VerifyDeclaration {
1182            request: LeanWorkerDeclarationVerificationRequest {
1183                source: "theorem t : True := by\n  trivial\n".to_owned(),
1184                target: LeanWorkerDeclarationVerificationTarget::Name { name: "t".to_owned() },
1185                sorry_policy: LeanWorkerSorryPolicy::Deny,
1186                report_axioms: true,
1187                budgets: LeanWorkerOutputBudgets::default(),
1188            },
1189            options: LeanWorkerElabOptions::default(),
1190            progress: false,
1191        });
1192        let mut bytes = Vec::new();
1193        write_frame(&mut bytes, request.clone(), MAX_FRAME_BYTES).expect("verification request writes");
1194        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("verification request reads");
1195        assert_eq!(frame.message, request);
1196
1197        let response = Message::Response(Response::DeclarationVerification {
1198            result: LeanWorkerDeclarationVerificationResult::Ok {
1199                verification_status: LeanWorkerDeclarationVerificationStatus::Accepted,
1200                facts: Box::new(LeanWorkerDeclarationVerificationFacts {
1201                    target: None,
1202                    diagnostics: LeanWorkerElabFailure {
1203                        diagnostics: Vec::new(),
1204                        truncated: false,
1205                    },
1206                    unresolved_goals: Vec::new(),
1207                    contains_sorry: false,
1208                    contains_admit: false,
1209                    contains_sorry_ax: false,
1210                    axioms: Vec::new(),
1211                    axioms_truncated: false,
1212                    output_truncated: false,
1213                }),
1214                imports: Vec::new(),
1215            },
1216        });
1217        let mut bytes = Vec::new();
1218        write_frame(&mut bytes, response.clone(), MAX_FRAME_BYTES).expect("verification response writes");
1219        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("verification response reads");
1220        assert_eq!(frame.message, response);
1221    }
1222}