Skip to main content

test_r_core/
ipc.rs

1use crate::internal::{CapturedOutput, FailureCause, TestResult};
2use crate::stats::Summary;
3use desert_rust::BinaryCodec;
4use interprocess::local_socket::{
5    GenericFilePath, GenericNamespaced, Name, NameType, ToFsName, ToNsName,
6};
7use std::io::{self, Read, Write};
8use std::time::Duration;
9
10/// Length-prefix width used to frame all IPC messages. A `u32` allows payloads
11/// up to 4 GiB which comfortably covers Cloneable payloads such as
12/// precompiled wasm components.
13pub const FRAME_LEN_BYTES: usize = 4;
14
15/// Writes a length-prefixed frame to the writer. The length is encoded as a
16/// little-endian `u32`.
17pub fn write_frame<W: Write>(writer: &mut W, payload: &[u8]) -> io::Result<()> {
18    let len = u32::try_from(payload.len()).map_err(|_| {
19        io::Error::new(
20            io::ErrorKind::InvalidData,
21            "IPC payload size exceeds u32::MAX",
22        )
23    })?;
24    writer.write_all(&len.to_le_bytes())?;
25    writer.write_all(payload)
26}
27
28/// Reads a length-prefixed frame produced by [`write_frame`].
29pub fn read_frame<R: Read>(reader: &mut R) -> io::Result<Vec<u8>> {
30    let mut len_bytes = [0u8; FRAME_LEN_BYTES];
31    reader.read_exact(&mut len_bytes)?;
32    let len = u32::from_le_bytes(len_bytes) as usize;
33    let mut payload = vec![0; len];
34    reader.read_exact(&mut payload)?;
35    Ok(payload)
36}
37
38#[cfg(feature = "tokio")]
39pub async fn write_frame_async<W>(writer: &mut W, payload: &[u8]) -> io::Result<()>
40where
41    W: tokio::io::AsyncWriteExt + Unpin,
42{
43    let len = u32::try_from(payload.len()).map_err(|_| {
44        io::Error::new(
45            io::ErrorKind::InvalidData,
46            "IPC payload size exceeds u32::MAX",
47        )
48    })?;
49    writer.write_all(&len.to_le_bytes()).await?;
50    writer.write_all(payload).await
51}
52
53#[cfg(feature = "tokio")]
54pub async fn read_frame_async<R>(reader: &mut R) -> io::Result<Vec<u8>>
55where
56    R: tokio::io::AsyncReadExt + Unpin,
57{
58    let mut len_bytes = [0u8; FRAME_LEN_BYTES];
59    reader.read_exact(&mut len_bytes).await?;
60    let len = u32::from_le_bytes(len_bytes) as usize;
61    let mut payload = vec![0; len];
62    reader.read_exact(&mut payload).await?;
63    Ok(payload)
64}
65
66/// Commands sent from the primary test runner to the spawned worker processes.
67#[derive(Debug, BinaryCodec)]
68pub enum IpcCommand {
69    RunTest {
70        name: String,
71        crate_name: String,
72        module_path: String,
73    },
74    /// Provide the wire bytes for a `Cloneable` dependency to the worker.
75    /// Sent before any test that requires the dep. The `dep_id` is the
76    /// dep's fully-qualified id (`{crate}::{module}::{name}`) so that
77    /// same-named deps registered in different modules don't collide.
78    /// Workers cache the bytes and pass them to the worker reconstructor
79    /// when materializing.
80    ProvideCloneable { dep_id: String, wire_bytes: Vec<u8> },
81    /// Provide the descriptor bytes for a `Hosted` dependency to a worker.
82    /// Same shape as [`Self::ProvideCloneable`]; on the worker side the
83    /// bytes are fed to `HostedDep::from_descriptor` (via the registered
84    /// worker reconstructor) instead of being treated as the dep value
85    /// directly. The `dep_id` is the dep's fully-qualified id.
86    ProvideHostedDescriptor { dep_id: String, wire_bytes: Vec<u8> },
87    /// Phase 1C: parent's response to a worker-initiated
88    /// [`IpcResponse::HostedRpcCall`]. Carries the same `request_id`
89    /// echoed back so the worker's stub can match the reply to the
90    /// outstanding in-flight call. `body` is `Ok(result_bytes)` if the
91    /// owner-side dispatcher succeeded, or `Err(message)` if it failed
92    /// (owner panic, unknown method, codec error, …).
93    HostedRpcReply {
94        request_id: u64,
95        body: HostedRpcReplyBody,
96    },
97}
98
99/// Body of a [`IpcCommand::HostedRpcReply`]. Either the serialized return
100/// value of the owner's method, or a human-readable error describing why
101/// dispatch failed.
102#[derive(Debug, BinaryCodec)]
103pub enum HostedRpcReplyBody {
104    Ok { result_bytes: Vec<u8> },
105    Err { message: String },
106}
107
108#[derive(Debug, BinaryCodec)]
109pub enum SerializableTestResult {
110    Passed {
111        exec_time: Duration,
112    },
113    Benchmarked {
114        exec_time: Duration,
115        ns_iter_summ: Summary,
116        mb_s: usize,
117    },
118    Failed {
119        exec_time: Duration,
120        rendered_failure_cause: String,
121    },
122    Ignored,
123}
124
125impl SerializableTestResult {
126    pub fn into_test_result(
127        self,
128        stdout: Vec<CapturedOutput>,
129        stderr: Vec<CapturedOutput>,
130    ) -> TestResult {
131        let mut captured = [stdout, stderr].concat();
132        captured.sort();
133
134        let mut result: TestResult = self.into();
135        result.set_captured_output(captured);
136        result
137    }
138}
139
140impl From<&TestResult> for SerializableTestResult {
141    fn from(result: &TestResult) -> Self {
142        match &result {
143            TestResult::Passed { exec_time, .. } => SerializableTestResult::Passed {
144                exec_time: *exec_time,
145            },
146            TestResult::Benchmarked {
147                exec_time,
148                ns_iter_summ,
149                mb_s,
150                ..
151            } => SerializableTestResult::Benchmarked {
152                exec_time: *exec_time,
153                ns_iter_summ: *ns_iter_summ,
154                mb_s: *mb_s,
155            },
156            TestResult::Failed {
157                exec_time, cause, ..
158            } => SerializableTestResult::Failed {
159                exec_time: *exec_time,
160                rendered_failure_cause: cause.render(),
161            },
162            TestResult::Ignored { .. } => SerializableTestResult::Ignored,
163        }
164    }
165}
166
167impl From<SerializableTestResult> for TestResult {
168    fn from(result: SerializableTestResult) -> Self {
169        match result {
170            SerializableTestResult::Passed { exec_time } => TestResult::passed(exec_time),
171            SerializableTestResult::Failed {
172                exec_time,
173                rendered_failure_cause,
174            } => TestResult::failed(
175                exec_time,
176                FailureCause::HarnessError(rendered_failure_cause),
177            ),
178            SerializableTestResult::Ignored => TestResult::ignored(),
179            SerializableTestResult::Benchmarked {
180                exec_time,
181                ns_iter_summ,
182                mb_s,
183            } => TestResult::benchmarked(exec_time, ns_iter_summ, mb_s),
184        }
185    }
186}
187
188/// Responses sent from the spawned worker processes to the primary test
189/// runner.
190#[derive(Debug, BinaryCodec)]
191pub enum IpcResponse {
192    TestFinished {
193        result: SerializableTestResult,
194        finish_marker: String,
195    },
196    /// Acknowledges a [`IpcCommand::ProvideCloneable`]. Echoes back the
197    /// fully-qualified `dep_id` the command carried.
198    CloneableAccepted { dep_id: String },
199    /// Acknowledges a [`IpcCommand::ProvideHostedDescriptor`]. Echoes back
200    /// the fully-qualified `dep_id`.
201    HostedDescriptorAccepted { dep_id: String },
202    /// Phase 1C: worker-initiated remote procedure call against a
203    /// `HostedRpc` dep owned by the parent. The worker's stub assigns a
204    /// monotonically-increasing `request_id`, serializes its method
205    /// arguments into `args_bytes`, and writes this frame on the shared
206    /// IPC stream. The parent's `Worker::run_test` loop dispatches the
207    /// call to the right owner via `dep_id`, and responds with a matching
208    /// [`IpcCommand::HostedRpcReply`].
209    HostedRpcCall {
210        request_id: u64,
211        dep_id: String,
212        method_idx: u32,
213        args_bytes: Vec<u8>,
214    },
215}
216
217pub fn ipc_name<'s>(name: String) -> Name<'s> {
218    if GenericNamespaced::is_supported() {
219        name.to_ns_name::<GenericNamespaced>()
220            .expect("Invalid local socket name")
221    } else {
222        name.to_fs_name::<GenericFilePath>()
223            .expect("Invalid local socket name")
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use std::io::Cursor;
231
232    #[test]
233    fn write_then_read_round_trip_empty() {
234        let mut buf: Vec<u8> = Vec::new();
235        write_frame(&mut buf, &[]).expect("write");
236        let mut cursor = Cursor::new(&buf);
237        let payload = read_frame(&mut cursor).expect("read");
238        assert!(payload.is_empty());
239    }
240
241    #[test]
242    fn write_then_read_round_trip_small() {
243        let mut buf: Vec<u8> = Vec::new();
244        let data = b"hello, world";
245        write_frame(&mut buf, data).expect("write");
246        let mut cursor = Cursor::new(&buf);
247        let payload = read_frame(&mut cursor).expect("read");
248        assert_eq!(payload, data);
249    }
250
251    #[test]
252    fn write_then_read_round_trip_large_payload_exceeds_u16() {
253        // 200 KiB — larger than the old u16 length prefix could express.
254        let mut data = vec![0u8; 200 * 1024];
255        for (i, b) in data.iter_mut().enumerate() {
256            *b = (i % 251) as u8;
257        }
258        let mut buf: Vec<u8> = Vec::new();
259        write_frame(&mut buf, &data).expect("write");
260        assert_eq!(buf.len(), FRAME_LEN_BYTES + data.len());
261        let mut cursor = Cursor::new(&buf);
262        let payload = read_frame(&mut cursor).expect("read");
263        assert_eq!(payload.len(), data.len());
264        assert_eq!(payload, data);
265    }
266
267    #[test]
268    fn read_frame_propagates_eof() {
269        let buf: Vec<u8> = Vec::new();
270        let mut cursor = Cursor::new(&buf);
271        let err = read_frame(&mut cursor).expect_err("must fail on empty");
272        assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
273    }
274
275    #[cfg(feature = "tokio")]
276    #[test]
277    fn async_round_trip_large_payload_exceeds_u16() {
278        let runtime = tokio::runtime::Runtime::new().unwrap();
279        runtime.block_on(async {
280            let mut data = vec![0u8; 200 * 1024];
281            for (i, b) in data.iter_mut().enumerate() {
282                *b = (i % 251) as u8;
283            }
284            let mut buf: Vec<u8> = Vec::new();
285            write_frame_async(&mut buf, &data).await.expect("write");
286            assert_eq!(buf.len(), FRAME_LEN_BYTES + data.len());
287            let mut slice: &[u8] = &buf;
288            let payload = read_frame_async(&mut slice).await.expect("read");
289            assert_eq!(payload, data);
290        });
291    }
292}