Skip to main content

lean_rs_worker_protocol/
protocol.rs

1//! Length-delimited frame codec and message payload types for the
2//! parent ↔ child worker process boundary.
3//!
4//! ## Additive evolution
5//!
6//! Every public enum here is `#[non_exhaustive]` so the wire format can gain
7//! a new request, response, or message kind without forcing a semver-major
8//! bump on consumers. Most structs are also `#[non_exhaustive]` and expose
9//! `pub fn new(...)` constructors so the shapes can grow fields without
10//! breaking external builders. The exception is [`DataRow`], which is built
11//! so frequently with struct-literal syntax (tests, harnesses, fakes) that
12//! the ergonomic cost of `#[non_exhaustive]` outweighs the additive-evolution
13//! benefit; the wire schema for a data row is also already fixed by the
14//! stream contract.
15
16use std::collections::BTreeMap;
17use std::fmt;
18use std::io::{self, Read, Write};
19use std::time::Duration;
20
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use serde_json::value::RawValue;
24
25use crate::types::{
26    LeanWorkerCapabilityMetadata, LeanWorkerDeclarationFilter, LeanWorkerDeclarationRow, LeanWorkerDeclarationSearch,
27    LeanWorkerDeclarationSearchResult, LeanWorkerDeclarationType, LeanWorkerDoctorReport, LeanWorkerElabOptions,
28    LeanWorkerElabResult, LeanWorkerKernelResult, LeanWorkerMetaResult, LeanWorkerMetaTransparency,
29    LeanWorkerModuleQuery, LeanWorkerModuleQueryBatchOutcome, LeanWorkerModuleQueryOutcome,
30    LeanWorkerModuleQuerySelector, LeanWorkerOutputBudgets, LeanWorkerRendered,
31};
32
33/// Wire protocol version negotiated between parent and child during the
34/// handshake frame. Bump only on a breaking wire change.
35pub const PROTOCOL_VERSION: u16 = 7;
36
37/// Default per-frame size limit applied by the parent when no explicit cap is
38/// configured on the capability builder.
39///
40/// The cap is a parent-side policy decision negotiated to the child at
41/// handshake time via [`Message::ConfigureFrameLimit`]. Both [`write_frame`]
42/// and [`read_frame`] reject frames whose serialised JSON payload exceeds the
43/// cap passed in, so a runaway producer cannot make the peer allocate without
44/// bound. The cap is per-connection—set once at handshake, applied to every
45/// subsequent frame in both directions.
46pub const MAX_FRAME_BYTES: u32 = 1024 * 1024;
47
48/// Floor on the configurable frame cap. Trivial requests and the handshake
49/// itself must fit inside this; callers cannot configure smaller.
50pub const MIN_FRAME_BYTES: u32 = 64 * 1024;
51
52/// Ceiling on the configurable frame cap. Prevents callers from defeating the
53/// memory-safety policy by passing an absurdly large value.
54pub const MAX_FRAME_BYTES_HARD_CAP: u32 = 256 * 1024 * 1024;
55
56/// Versioned envelope around a single protocol [`Message`].
57#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
58#[non_exhaustive]
59pub struct Frame {
60    /// Protocol version the sender used. Receivers reject mismatches.
61    pub version: u16,
62    /// Inner message payload.
63    pub message: Message,
64}
65
66impl Frame {
67    /// Wrap `message` in a frame tagged with the current [`PROTOCOL_VERSION`].
68    #[must_use]
69    pub fn new(message: Message) -> Self {
70        Self {
71            version: PROTOCOL_VERSION,
72            message,
73        }
74    }
75}
76
77/// One protocol message exchanged over the worker boundary.
78#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
79#[serde(tag = "type", content = "body", rename_all = "snake_case")]
80#[non_exhaustive]
81pub enum Message {
82    /// Sent by the child immediately after spawn to advertise its version and
83    /// supported protocol revision.
84    Handshake {
85        /// `lean-rs-worker-child` package version.
86        worker_version: String,
87        /// Protocol version the child speaks. Parent rejects mismatches.
88        protocol_version: u16,
89    },
90    /// Sent by the parent immediately after the handshake frame to negotiate
91    /// the per-frame size cap for the remainder of this connection.
92    ///
93    /// The parent owns the memory-safety policy: it clamps the requested cap
94    /// into <code>[[MIN_FRAME_BYTES], [MAX_FRAME_BYTES_HARD_CAP]]</code>
95    /// before sending. The child installs the value as-is.
96    ConfigureFrameLimit {
97        /// Per-frame byte cap applied in both directions for this connection.
98        max_frame_bytes: u32,
99    },
100    /// Parent → child request frame.
101    Request(Request),
102    /// Child → parent terminal response for one request.
103    Response(Response),
104    /// Child → parent intermediate diagnostic frame.
105    Diagnostic(Diagnostic),
106    /// Child → parent intermediate progress frame.
107    ProgressTick(ProgressTick),
108    /// Child → parent streaming data row frame.
109    DataRow(DataRow),
110    /// Child → parent fatal exit notification carrying the captured stderr
111    /// just before the child process tears down.
112    FatalExit(FatalExit),
113}
114
115/// Parent-issued worker request body.
116#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
117#[serde(tag = "op", rename_all = "snake_case")]
118#[non_exhaustive]
119pub enum Request {
120    Health,
121    LoadFixtureCapability {
122        manifest_path: String,
123    },
124    CallFixtureMul {
125        manifest_path: String,
126        lhs: u64,
127        rhs: u64,
128    },
129    TriggerLeanPanic {
130        manifest_path: String,
131    },
132    OpenHostSession {
133        project_root: String,
134        mode: HostSessionMode,
135        imports: Vec<String>,
136    },
137    Elaborate {
138        source: String,
139        options: LeanWorkerElabOptions,
140    },
141    KernelCheck {
142        source: String,
143        options: LeanWorkerElabOptions,
144        progress: bool,
145    },
146    DeclarationKinds {
147        names: Vec<String>,
148        progress: bool,
149    },
150    DeclarationNames {
151        names: Vec<String>,
152        progress: bool,
153    },
154    RunDataStream {
155        export: String,
156        request_json: String,
157        progress: bool,
158    },
159    CapabilityMetadata {
160        export: String,
161        request_json: String,
162    },
163    CapabilityDoctor {
164        export: String,
165        request_json: String,
166    },
167    JsonCommand {
168        export: String,
169        request_json: String,
170    },
171    InferType {
172        source: String,
173        options: LeanWorkerElabOptions,
174    },
175    Whnf {
176        source: String,
177        options: LeanWorkerElabOptions,
178    },
179    IsDefEq {
180        lhs: String,
181        rhs: String,
182        transparency: LeanWorkerMetaTransparency,
183        options: LeanWorkerElabOptions,
184    },
185    Describe {
186        name: String,
187    },
188    SearchDeclarations {
189        search: LeanWorkerDeclarationSearch,
190    },
191    DeclarationType {
192        name: String,
193        max_bytes: usize,
194    },
195    ListDeclarationsStrings {
196        filter: LeanWorkerDeclarationFilter,
197        progress: bool,
198    },
199    DescribeBulk {
200        names: Vec<String>,
201        progress: bool,
202    },
203    ProcessModuleQuery {
204        source: String,
205        query: LeanWorkerModuleQuery,
206        options: LeanWorkerElabOptions,
207    },
208    ProcessModuleQueryBatch {
209        source: String,
210        selectors: Vec<LeanWorkerModuleQuerySelector>,
211        budgets: LeanWorkerOutputBudgets,
212        options: LeanWorkerElabOptions,
213    },
214    ClearModuleSnapshotCache,
215    // Private harness requests that exercise streaming frame behavior.
216    // Not part of the public row sink API.
217    EmitTestRows {
218        streams: Vec<String>,
219    },
220    EmitTestRowsThenExit,
221    EmitTestRowsThenPanic,
222    Terminate,
223}
224
225/// How the worker child should load host-session capabilities.
226#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
227#[serde(tag = "kind", rename_all = "snake_case")]
228#[non_exhaustive]
229pub enum HostSessionMode {
230    /// Open a user capability dylib and the bundled host shims.
231    Capability {
232        package: String,
233        lib_name: String,
234        manifest_path: Option<String>,
235    },
236    /// Open only the bundled host shims.
237    ShimsOnly,
238}
239
240/// Child-issued terminal response body for one [`Request`].
241#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
242#[serde(tag = "status", rename_all = "snake_case")]
243#[non_exhaustive]
244pub enum Response {
245    HealthOk,
246    CapabilityLoaded,
247    U64 {
248        value: u64,
249    },
250    HostSessionOpened,
251    Elaboration {
252        outcome: LeanWorkerElabResult,
253    },
254    KernelCheck {
255        outcome: LeanWorkerKernelResult,
256    },
257    Strings {
258        values: Vec<String>,
259    },
260    StreamComplete {
261        summary: StreamSummary,
262    },
263    StreamExportFailed {
264        status_byte: u8,
265    },
266    StreamCallbackFailed {
267        status_byte: u8,
268        description: String,
269    },
270    StreamRowMalformed {
271        message: String,
272    },
273    CapabilityMetadata {
274        metadata: LeanWorkerCapabilityMetadata,
275    },
276    CapabilityDoctor {
277        report: LeanWorkerDoctorReport,
278    },
279    CapabilityMetadataMalformed {
280        message: String,
281    },
282    CapabilityDoctorMalformed {
283        message: String,
284    },
285    JsonCommand {
286        response_json: String,
287    },
288    MetaExpr {
289        result: LeanWorkerMetaResult<LeanWorkerRendered>,
290    },
291    MetaBool {
292        result: LeanWorkerMetaResult<bool>,
293    },
294    Declaration {
295        row: Option<LeanWorkerDeclarationRow>,
296    },
297    DeclarationSearch {
298        result: LeanWorkerDeclarationSearchResult,
299    },
300    DeclarationType {
301        row: Option<LeanWorkerDeclarationType>,
302    },
303    DeclarationBulk {
304        rows: Vec<LeanWorkerDeclarationRow>,
305    },
306    ProcessModuleQuery {
307        outcome: LeanWorkerModuleQueryOutcome,
308    },
309    ProcessModuleQueryBatch {
310        outcome: LeanWorkerModuleQueryBatchOutcome,
311    },
312    ModuleSnapshotCacheCleared {
313        result: crate::types::LeanWorkerModuleSnapshotCacheClearResult,
314    },
315    RowsComplete {
316        count: u64,
317    },
318    Terminating,
319    Error {
320        code: String,
321        message: String,
322    },
323}
324
325/// Intermediate diagnostic frame emitted by the child during a request.
326#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
327#[non_exhaustive]
328pub struct Diagnostic {
329    /// Stable diagnostic code identifier.
330    pub code: String,
331    /// Bounded human-readable diagnostic message.
332    pub message: String,
333}
334
335impl Diagnostic {
336    /// Build a diagnostic frame payload.
337    #[must_use]
338    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
339        Self {
340            code: code.into(),
341            message: message.into(),
342        }
343    }
344}
345
346/// Intermediate progress frame emitted by the child during a long-running
347/// request.
348#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
349#[non_exhaustive]
350pub struct ProgressTick {
351    /// Phase name the child is reporting progress for.
352    pub phase: String,
353    /// Items completed so far in this phase.
354    pub current: u64,
355    /// Total expected items in this phase, if known.
356    pub total: Option<u64>,
357}
358
359impl ProgressTick {
360    /// Build a progress-tick frame payload.
361    #[must_use]
362    pub fn new(phase: impl Into<String>, current: u64, total: Option<u64>) -> Self {
363        Self {
364            phase: phase.into(),
365            current,
366            total,
367        }
368    }
369}
370
371/// One row in a streaming response.
372///
373/// Construction goes through [`DataRowEmitter::next`] in the child runtime;
374/// direct struct-literal construction is permitted in tests and harnesses.
375/// This struct intentionally stays exhaustive: see the module-level note on
376/// additive evolution.
377#[derive(Clone, Debug, Deserialize, Serialize)]
378pub struct DataRow {
379    /// Logical stream this row belongs to.
380    pub stream: String,
381    /// Per-stream monotonically increasing sequence number.
382    pub sequence: u64,
383    /// Opaque JSON payload (deserialised lazily by the parent).
384    pub payload: Box<RawValue>,
385}
386
387impl PartialEq for DataRow {
388    fn eq(&self, other: &Self) -> bool {
389        self.stream == other.stream && self.sequence == other.sequence && self.payload.get() == other.payload.get()
390    }
391}
392
393impl Eq for DataRow {}
394
395/// Terminal stream-completion summary returned alongside a streaming response.
396#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
397#[non_exhaustive]
398pub struct StreamSummary {
399    /// Total rows emitted across all streams.
400    pub total_rows: u64,
401    /// Per-stream row counts at completion.
402    pub per_stream_counts: BTreeMap<String, u64>,
403    /// Child-side elapsed time in microseconds.
404    pub elapsed_micros: u64,
405    /// Optional downstream-defined terminal metadata.
406    pub metadata: Option<Value>,
407}
408
409impl StreamSummary {
410    /// Build a stream-completion summary, clamping the elapsed duration into
411    /// the `u64` micros field.
412    #[must_use]
413    pub fn new(
414        total_rows: u64,
415        per_stream_counts: BTreeMap<String, u64>,
416        elapsed: Duration,
417        metadata: Option<Value>,
418    ) -> Self {
419        Self {
420            total_rows,
421            per_stream_counts,
422            elapsed_micros: elapsed.as_micros().try_into().unwrap_or(u64::MAX),
423            metadata,
424        }
425    }
426}
427
428/// Stateful emitter that assigns per-stream sequence numbers and tracks the
429/// running row count for the terminal [`StreamSummary`].
430#[derive(Debug, Default)]
431#[non_exhaustive]
432pub struct DataRowEmitter {
433    sequences: BTreeMap<String, u64>,
434    count: u64,
435}
436
437impl DataRowEmitter {
438    /// Allocate the next [`DataRow`] for `stream`, advancing the per-stream
439    /// sequence and the overall count.
440    pub fn next(&mut self, stream: impl Into<String>, payload: Box<RawValue>) -> DataRow {
441        let stream = stream.into();
442        let sequence = self.sequences.entry(stream.clone()).or_insert(0);
443        let row = DataRow {
444            stream,
445            sequence: *sequence,
446            payload,
447        };
448        *sequence = sequence.saturating_add(1);
449        self.count = self.count.saturating_add(1);
450        row
451    }
452
453    #[cfg(test)]
454    fn emit(
455        &mut self,
456        writer: &mut impl Write,
457        stream: impl Into<String>,
458        payload: &Value,
459    ) -> Result<(), ProtocolError> {
460        let row = self.next(stream, serde_json::value::to_raw_value(payload)?);
461        write_frame(writer, Message::DataRow(row), MAX_FRAME_BYTES)
462    }
463
464    /// Total rows emitted across all streams.
465    #[must_use]
466    pub fn count(&self) -> u64 {
467        self.count
468    }
469
470    /// Snapshot of per-stream row counts.
471    #[must_use]
472    pub fn per_stream_counts(&self) -> BTreeMap<String, u64> {
473        self.sequences.clone()
474    }
475}
476
477/// Final frame the child writes before it tears down on an unrecoverable
478/// failure.
479#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
480#[non_exhaustive]
481pub struct FatalExit {
482    /// Stringified `ExitStatus` of the child process.
483    pub status: String,
484    /// Captured stderr tail at fatal-exit time.
485    pub stderr: String,
486}
487
488impl FatalExit {
489    /// Build a fatal-exit frame payload.
490    #[must_use]
491    pub fn new(status: impl Into<String>, stderr: impl Into<String>) -> Self {
492        Self {
493            status: status.into(),
494            stderr: stderr.into(),
495        }
496    }
497}
498
499/// Failure modes the codec can produce while reading or writing a frame.
500#[derive(Debug)]
501#[non_exhaustive]
502pub enum ProtocolError {
503    /// Underlying I/O failure (pipe closed, partial read, etc.).
504    Io(io::Error),
505    /// JSON serialisation or deserialisation failure.
506    Json(serde_json::Error),
507    /// A frame body exceeded [`MAX_FRAME_BYTES`].
508    FrameTooLarge {
509        /// Observed frame size in bytes.
510        len: u32,
511        /// Maximum allowed frame size.
512        max: u32,
513    },
514    /// Peer's frame version did not match this binary's [`PROTOCOL_VERSION`].
515    VersionMismatch {
516        /// Version this binary expected.
517        expected: u16,
518        /// Version the peer used.
519        actual: u16,
520    },
521}
522
523impl ProtocolError {
524    /// Whether the underlying I/O error indicates the peer's pipe was closed
525    /// (`UnexpectedEof`). Used by callers to distinguish a clean fatal exit
526    /// from a true protocol failure.
527    #[must_use]
528    pub fn is_eof(&self) -> bool {
529        matches!(self, Self::Io(err) if err.kind() == io::ErrorKind::UnexpectedEof)
530    }
531}
532
533impl fmt::Display for ProtocolError {
534    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535        match self {
536            Self::Io(err) => write!(f, "worker protocol I/O failed: {err}"),
537            Self::Json(err) => write!(f, "worker protocol JSON decode failed: {err}"),
538            Self::FrameTooLarge { len, max } => {
539                write!(f, "worker protocol frame too large: {len} bytes exceeds {max}")
540            }
541            Self::VersionMismatch { expected, actual } => {
542                write!(
543                    f,
544                    "worker protocol version mismatch: expected {expected}, received {actual}"
545                )
546            }
547        }
548    }
549}
550
551impl std::error::Error for ProtocolError {}
552
553impl From<io::Error> for ProtocolError {
554    fn from(value: io::Error) -> Self {
555        Self::Io(value)
556    }
557}
558
559impl From<serde_json::Error> for ProtocolError {
560    fn from(value: serde_json::Error) -> Self {
561        Self::Json(value)
562    }
563}
564
565/// Serialise `message` as a length-delimited JSON frame to `writer`.
566///
567/// `max_frame_bytes` is the per-frame cap negotiated for this connection.
568/// Until the handshake completes, callers pass [`MAX_FRAME_BYTES`] as the
569/// default; afterwards the supervisor passes the
570/// [`Message::ConfigureFrameLimit`] value installed on the connection.
571///
572/// # Errors
573///
574/// Returns [`ProtocolError::FrameTooLarge`] if the serialised body would
575/// exceed `max_frame_bytes`, or the underlying [`ProtocolError::Io`] /
576/// [`ProtocolError::Json`] for codec failures.
577pub fn write_frame(writer: &mut impl Write, message: Message, max_frame_bytes: u32) -> Result<(), ProtocolError> {
578    let bytes = serde_json::to_vec(&Frame::new(message))?;
579    let len = u32::try_from(bytes.len()).map_err(|_| ProtocolError::FrameTooLarge {
580        len: u32::MAX,
581        max: max_frame_bytes,
582    })?;
583    if len > max_frame_bytes {
584        return Err(ProtocolError::FrameTooLarge {
585            len,
586            max: max_frame_bytes,
587        });
588    }
589    writer.write_all(&len.to_be_bytes())?;
590    writer.write_all(&bytes)?;
591    writer.flush()?;
592    Ok(())
593}
594
595/// Read one length-delimited JSON frame from `reader`.
596///
597/// `max_frame_bytes` is the per-frame cap negotiated for this connection. See
598/// [`write_frame`] for the back-compat default and post-handshake semantics.
599///
600/// # Errors
601///
602/// Returns [`ProtocolError::FrameTooLarge`] if the framed length exceeds
603/// `max_frame_bytes` (rejected before allocation),
604/// [`ProtocolError::VersionMismatch`] if the peer's version does not match
605/// [`PROTOCOL_VERSION`], or the underlying [`ProtocolError::Io`] /
606/// [`ProtocolError::Json`] for codec failures.
607pub fn read_frame(reader: &mut impl Read, max_frame_bytes: u32) -> Result<Frame, ProtocolError> {
608    let mut len_bytes = [0_u8; 4];
609    reader.read_exact(&mut len_bytes)?;
610    let len = u32::from_be_bytes(len_bytes);
611    if len > max_frame_bytes {
612        return Err(ProtocolError::FrameTooLarge {
613            len,
614            max: max_frame_bytes,
615        });
616    }
617    let mut bytes = vec![0_u8; len as usize];
618    reader.read_exact(&mut bytes)?;
619    let frame: Frame = serde_json::from_slice(&bytes)?;
620    if frame.version != PROTOCOL_VERSION {
621        return Err(ProtocolError::VersionMismatch {
622            expected: PROTOCOL_VERSION,
623            actual: frame.version,
624        });
625    }
626    Ok(frame)
627}
628
629#[cfg(test)]
630mod tests {
631    #![allow(clippy::expect_used, clippy::panic)]
632
633    use std::io::Cursor;
634
635    use serde_json::json;
636    use serde_json::value::RawValue;
637
638    use super::{
639        DataRow, DataRowEmitter, MAX_FRAME_BYTES, MAX_FRAME_BYTES_HARD_CAP, MIN_FRAME_BYTES, Message, ProtocolError,
640        Request, Response, read_frame, write_frame,
641    };
642    use crate::types::{
643        LeanWorkerElabFailure, LeanWorkerElabOptions, LeanWorkerModuleCacheStatus, LeanWorkerModuleQuery,
644        LeanWorkerModuleQueryBatchEnvelope, LeanWorkerModuleQueryBatchItem, LeanWorkerModuleQueryBatchOutcome,
645        LeanWorkerModuleQueryBatchResult, LeanWorkerModuleQueryCacheFacts, LeanWorkerModuleQueryOutcome,
646        LeanWorkerModuleQueryResult, LeanWorkerModuleQuerySelector, LeanWorkerModuleQueryTimings,
647        LeanWorkerModuleSourceSpan, LeanWorkerOutputBudgets, LeanWorkerRenderedInfo, LeanWorkerTypeAtResult,
648    };
649
650    fn raw_json(value: &serde_json::Value) -> Box<RawValue> {
651        serde_json::value::to_raw_value(value).expect("test JSON converts to raw value")
652    }
653
654    #[test]
655    fn data_row_round_trips_through_length_delimited_frame() {
656        let row = DataRow {
657            stream: "rows".to_owned(),
658            sequence: 7,
659            payload: raw_json(&json!({ "name": "Nat.add", "score": 3 })),
660        };
661        let mut bytes = Vec::new();
662        write_frame(&mut bytes, Message::DataRow(row.clone()), MAX_FRAME_BYTES).expect("data row writes");
663        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("data row reads");
664        assert_eq!(frame.message, Message::DataRow(row));
665    }
666
667    #[test]
668    fn data_row_emitter_assigns_per_stream_sequences() {
669        let mut emitter = DataRowEmitter::default();
670        let mut bytes = Vec::new();
671        emitter
672            .emit(&mut bytes, "rows", &json!({ "i": 0 }))
673            .expect("first row writes");
674        emitter
675            .emit(&mut bytes, "warnings", &json!({ "i": 1 }))
676            .expect("second row writes");
677        emitter
678            .emit(&mut bytes, "rows", &json!({ "i": 2 }))
679            .expect("third row writes");
680        assert_eq!(emitter.count(), 3);
681
682        let mut cursor = Cursor::new(bytes);
683        let rows = [
684            read_frame(&mut cursor, MAX_FRAME_BYTES).expect("first row reads"),
685            read_frame(&mut cursor, MAX_FRAME_BYTES).expect("second row reads"),
686            read_frame(&mut cursor, MAX_FRAME_BYTES).expect("third row reads"),
687        ];
688        assert_eq!(
689            rows.map(|frame| frame.message),
690            [
691                Message::DataRow(DataRow {
692                    stream: "rows".to_owned(),
693                    sequence: 0,
694                    payload: raw_json(&json!({ "i": 0 })),
695                }),
696                Message::DataRow(DataRow {
697                    stream: "warnings".to_owned(),
698                    sequence: 0,
699                    payload: raw_json(&json!({ "i": 1 })),
700                }),
701                Message::DataRow(DataRow {
702                    stream: "rows".to_owned(),
703                    sequence: 1,
704                    payload: raw_json(&json!({ "i": 2 })),
705                }),
706            ],
707        );
708    }
709
710    #[test]
711    fn oversized_data_row_is_rejected_before_write() {
712        let row = DataRow {
713            stream: "rows".to_owned(),
714            sequence: 0,
715            payload: raw_json(&json!({ "blob": "x".repeat(MAX_FRAME_BYTES as usize) })),
716        };
717        let mut bytes = Vec::new();
718        let err =
719            write_frame(&mut bytes, Message::DataRow(row), MAX_FRAME_BYTES).expect_err("oversized frame is rejected");
720        match err {
721            ProtocolError::FrameTooLarge { len, max } => {
722                assert!(len > max);
723                assert_eq!(max, MAX_FRAME_BYTES);
724            }
725            other @ (ProtocolError::Io(_) | ProtocolError::Json(_) | ProtocolError::VersionMismatch { .. }) => {
726                panic!("expected FrameTooLarge, got {other:?}");
727            }
728        }
729    }
730
731    #[test]
732    fn oversized_data_row_is_rejected_before_read_allocation() {
733        let mut bytes = Vec::new();
734        bytes.extend_from_slice(&(MAX_FRAME_BYTES.saturating_add(1)).to_be_bytes());
735        let err = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect_err("oversized frame is rejected");
736        match err {
737            ProtocolError::FrameTooLarge { len, max } => {
738                assert_eq!(len, MAX_FRAME_BYTES + 1);
739                assert_eq!(max, MAX_FRAME_BYTES);
740            }
741            other @ (ProtocolError::Io(_) | ProtocolError::Json(_) | ProtocolError::VersionMismatch { .. }) => {
742                panic!("expected FrameTooLarge, got {other:?}");
743            }
744        }
745    }
746
747    #[test]
748    fn larger_cap_accepts_frame_rejected_under_default() {
749        // A 2 MiB payload is rejected under MAX_FRAME_BYTES (1 MiB) but
750        // accepted when the cap is raised—proving the cap parameter is the
751        // only thing the codec consults.
752        let raised = MAX_FRAME_BYTES.saturating_mul(8);
753        let row = DataRow {
754            stream: "rows".to_owned(),
755            sequence: 0,
756            payload: raw_json(&json!({ "blob": "x".repeat(2 * MAX_FRAME_BYTES as usize) })),
757        };
758        let mut buf = Vec::new();
759        write_frame(&mut buf, Message::DataRow(row.clone()), raised).expect("oversize-under-default frame writes");
760        let frame = read_frame(&mut Cursor::new(buf), raised).expect("oversize-under-default frame reads");
761        assert_eq!(frame.message, Message::DataRow(row));
762    }
763
764    #[test]
765    fn frame_cap_bounds_constants_are_consistent() {
766        const { assert!(MIN_FRAME_BYTES <= MAX_FRAME_BYTES) };
767        const { assert!(MAX_FRAME_BYTES <= MAX_FRAME_BYTES_HARD_CAP) };
768    }
769
770    #[test]
771    fn malformed_frame_payload_is_protocol_error() {
772        let mut bytes = Vec::new();
773        bytes.extend_from_slice(&1_u32.to_be_bytes());
774        bytes.push(b'{');
775        let err = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect_err("malformed JSON is rejected");
776        match err {
777            ProtocolError::Json(_) => {}
778            other @ (ProtocolError::Io(_)
779            | ProtocolError::FrameTooLarge { .. }
780            | ProtocolError::VersionMismatch { .. }) => {
781                panic!("expected Json error, got {other:?}");
782            }
783        }
784    }
785
786    #[test]
787    fn rows_complete_response_round_trips() {
788        let mut bytes = Vec::new();
789        write_frame(
790            &mut bytes,
791            Message::Response(Response::RowsComplete { count: 2 }),
792            MAX_FRAME_BYTES,
793        )
794        .expect("rows complete writes");
795        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("rows complete reads");
796        assert_eq!(frame.message, Message::Response(Response::RowsComplete { count: 2 }));
797    }
798
799    #[test]
800    fn module_query_request_and_response_round_trip() {
801        let request = Message::Request(Request::ProcessModuleQuery {
802            source: "def x := 1\n#check x\n".to_owned(),
803            query: LeanWorkerModuleQuery::TypeAt { line: 2, column: 8 },
804            options: LeanWorkerElabOptions::default(),
805        });
806        let mut bytes = Vec::new();
807        write_frame(&mut bytes, request.clone(), MAX_FRAME_BYTES).expect("module query request writes");
808        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("module query request reads");
809        assert_eq!(frame.message, request);
810
811        let response = Message::Response(Response::ProcessModuleQuery {
812            outcome: LeanWorkerModuleQueryOutcome::Ok {
813                imports: Vec::new(),
814                result: LeanWorkerModuleQueryResult::TypeAt(LeanWorkerTypeAtResult::Term {
815                    span: LeanWorkerModuleSourceSpan {
816                        start_line: 2,
817                        start_column: 8,
818                        end_line: 2,
819                        end_column: 9,
820                    },
821                    expr: LeanWorkerRenderedInfo {
822                        value: "x".to_owned(),
823                        truncated: false,
824                    },
825                    type_str: LeanWorkerRenderedInfo {
826                        value: "Nat".to_owned(),
827                        truncated: false,
828                    },
829                    expected_type: None,
830                }),
831            },
832        });
833        let mut bytes = Vec::new();
834        write_frame(&mut bytes, response.clone(), MAX_FRAME_BYTES).expect("module query response writes");
835        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("module query response reads");
836        assert_eq!(frame.message, response);
837
838        let unsupported = LeanWorkerModuleQueryOutcome::Unsupported;
839        let json = serde_json::to_value(&unsupported).expect("unsupported serializes");
840        assert_eq!(json, json!({ "status": "unsupported" }));
841
842        let diagnostics = LeanWorkerModuleQueryResult::Diagnostics(LeanWorkerElabFailure {
843            diagnostics: Vec::new(),
844            truncated: false,
845        });
846        let json = serde_json::to_value(&diagnostics).expect("diagnostics serializes");
847        assert_eq!(
848            json,
849            json!({
850                "result": "diagnostics",
851                "body": {
852                    "diagnostics": [],
853                    "truncated": false
854                }
855            })
856        );
857    }
858
859    #[test]
860    fn module_query_batch_request_and_response_round_trip() {
861        let request = Message::Request(Request::ProcessModuleQueryBatch {
862            source: "theorem t : True := by\n  trivial\n".to_owned(),
863            selectors: vec![
864                LeanWorkerModuleQuerySelector::Diagnostics {
865                    id: "diagnostics".to_owned(),
866                },
867                LeanWorkerModuleQuerySelector::ProofState {
868                    id: "state".to_owned(),
869                    line: 2,
870                    column: 4,
871                },
872            ],
873            budgets: LeanWorkerOutputBudgets::default(),
874            options: LeanWorkerElabOptions::default(),
875        });
876        let mut bytes = Vec::new();
877        write_frame(&mut bytes, request.clone(), MAX_FRAME_BYTES).expect("module query batch request writes");
878        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("module query batch request reads");
879        assert_eq!(frame.message, request);
880
881        let response = Message::Response(Response::ProcessModuleQueryBatch {
882            outcome: LeanWorkerModuleQueryBatchOutcome::Ok {
883                imports: Vec::new(),
884                result: LeanWorkerModuleQueryBatchEnvelope {
885                    items: vec![LeanWorkerModuleQueryBatchItem::Ok {
886                        id: "diagnostics".to_owned(),
887                        result: Box::new(LeanWorkerModuleQueryBatchResult::Diagnostics(LeanWorkerElabFailure {
888                            diagnostics: Vec::new(),
889                            truncated: false,
890                        })),
891                    }],
892                    total_truncated: false,
893                },
894                facts: LeanWorkerModuleQueryCacheFacts {
895                    cache_status: LeanWorkerModuleCacheStatus::Miss,
896                    timings: LeanWorkerModuleQueryTimings::zero(),
897                    output_bytes: 0,
898                    cache_entry_count: Some(1),
899                    cache_approx_bytes: Some(1024),
900                },
901            },
902        });
903        let mut bytes = Vec::new();
904        write_frame(&mut bytes, response.clone(), MAX_FRAME_BYTES).expect("module query batch response writes");
905        let frame = read_frame(&mut Cursor::new(bytes), MAX_FRAME_BYTES).expect("module query batch response reads");
906        assert_eq!(frame.message, response);
907    }
908}