Skip to main content

cmr_core/
compressor_ipc.rs

1//! IPC protocol for compressor isolation.
2
3use std::io::{Read, Write};
4
5use serde::{Deserialize, Serialize, de::DeserializeOwned};
6use thiserror::Error;
7
8/// Maximum message frame size used by default helpers.
9pub const DEFAULT_MAX_FRAME_BYTES: usize = 8 * 1024 * 1024;
10
11/// Compressor RPC request.
12#[derive(Clone, Debug, Serialize, Deserialize)]
13pub enum CompressorRequest {
14    /// Liveness check.
15    Health,
16    /// Compute CMR Section 3.2 compression distance:
17    /// C(XY)-C(X) + C(YX)-C(Y).
18    CompressionDistance {
19        /// Left payload (X).
20        left: Vec<u8>,
21        /// Right payload (Y).
22        right: Vec<u8>,
23    },
24    /// Compute CMR Section 3.2 compression distance for chunked payloads.
25    /// Parts are concatenated logically without requiring callers to materialize
26    /// a single contiguous buffer.
27    CompressionDistanceChain {
28        /// Left payload parts in order.
29        left_parts: Vec<Vec<u8>>,
30        /// Right payload parts in order.
31        right_parts: Vec<Vec<u8>>,
32    },
33    /// Compute intrinsic dependence of a sequence.
34    IntrinsicDependence {
35        /// Payload.
36        data: Vec<u8>,
37        /// Estimator max order.
38        max_order: i64,
39    },
40    /// Compute CMR distances from one payload to many candidates.
41    BatchCompressionDistance {
42        /// Target payload.
43        target: Vec<u8>,
44        /// Candidate payloads.
45        candidates: Vec<Vec<u8>>,
46    },
47}
48
49/// Compressor RPC response.
50#[derive(Clone, Debug, Serialize, Deserialize)]
51pub enum CompressorResponse {
52    /// Liveness check response.
53    Health { ok: bool },
54    /// Scalar CMR distance.
55    CompressionDistance { value: f64 },
56    /// Scalar intrinsic-dependence value.
57    IntrinsicDependence { value: f64 },
58    /// Batch CMR distances.
59    BatchCompressionDistance { values: Vec<f64> },
60    /// Error response from worker.
61    Error { message: String },
62}
63
64/// IPC transport errors.
65#[derive(Debug, Error)]
66pub enum IpcError {
67    /// Underlying I/O failure.
68    #[error("i/o error: {0}")]
69    Io(#[from] std::io::Error),
70    /// Serde codec failure.
71    #[error("serde error: {0}")]
72    Serde(#[from] serde_json::Error),
73    /// Length-prefix exceeds bounds.
74    #[error("frame exceeds max length")]
75    FrameTooLarge,
76}
77
78/// Writes a length-prefixed JSON frame.
79pub fn write_frame<T: Serialize>(writer: &mut impl Write, value: &T) -> Result<(), IpcError> {
80    let payload = serde_json::to_vec(value)?;
81    let len = u32::try_from(payload.len()).map_err(|_| IpcError::FrameTooLarge)?;
82    writer.write_all(&len.to_be_bytes())?;
83    writer.write_all(&payload)?;
84    writer.flush()?;
85    Ok(())
86}
87
88/// Reads a length-prefixed JSON frame.
89pub fn read_frame<T: DeserializeOwned>(
90    reader: &mut impl Read,
91    max_bytes: usize,
92) -> Result<T, IpcError> {
93    let mut len_buf = [0_u8; 4];
94    reader.read_exact(&mut len_buf)?;
95    let len = u32::from_be_bytes(len_buf) as usize;
96    if len > max_bytes {
97        return Err(IpcError::FrameTooLarge);
98    }
99    let mut payload = vec![0_u8; len];
100    reader.read_exact(&mut payload)?;
101    Ok(serde_json::from_slice(&payload)?)
102}