1use crate::internal::{CapturedOutput, FailureCause, TestResult};
2use crate::stats::Summary;
3use desert_rust::BinaryCodec;
4use interprocess::local_socket::{
5 GenericFilePath, GenericNamespaced, Name, NameType, ToFsName, ToNsName,
6};
7use std::io::{self, Read, Write};
8use std::time::Duration;
9
10pub const FRAME_LEN_BYTES: usize = 4;
14
15pub fn write_frame<W: Write>(writer: &mut W, payload: &[u8]) -> io::Result<()> {
18 let len = u32::try_from(payload.len()).map_err(|_| {
19 io::Error::new(
20 io::ErrorKind::InvalidData,
21 "IPC payload size exceeds u32::MAX",
22 )
23 })?;
24 writer.write_all(&len.to_le_bytes())?;
25 writer.write_all(payload)
26}
27
28pub fn read_frame<R: Read>(reader: &mut R) -> io::Result<Vec<u8>> {
30 let mut len_bytes = [0u8; FRAME_LEN_BYTES];
31 reader.read_exact(&mut len_bytes)?;
32 let len = u32::from_le_bytes(len_bytes) as usize;
33 let mut payload = vec![0; len];
34 reader.read_exact(&mut payload)?;
35 Ok(payload)
36}
37
38#[cfg(feature = "tokio")]
39pub async fn write_frame_async<W>(writer: &mut W, payload: &[u8]) -> io::Result<()>
40where
41 W: tokio::io::AsyncWriteExt + Unpin,
42{
43 let len = u32::try_from(payload.len()).map_err(|_| {
44 io::Error::new(
45 io::ErrorKind::InvalidData,
46 "IPC payload size exceeds u32::MAX",
47 )
48 })?;
49 writer.write_all(&len.to_le_bytes()).await?;
50 writer.write_all(payload).await
51}
52
53#[cfg(feature = "tokio")]
54pub async fn read_frame_async<R>(reader: &mut R) -> io::Result<Vec<u8>>
55where
56 R: tokio::io::AsyncReadExt + Unpin,
57{
58 let mut len_bytes = [0u8; FRAME_LEN_BYTES];
59 reader.read_exact(&mut len_bytes).await?;
60 let len = u32::from_le_bytes(len_bytes) as usize;
61 let mut payload = vec![0; len];
62 reader.read_exact(&mut payload).await?;
63 Ok(payload)
64}
65
66#[derive(Debug, BinaryCodec)]
68pub enum IpcCommand {
69 RunTest {
70 name: String,
71 crate_name: String,
72 module_path: String,
73 },
74 ProvideCloneable { dep_id: String, wire_bytes: Vec<u8> },
81 ProvideHostedDescriptor { dep_id: String, wire_bytes: Vec<u8> },
87 HostedRpcReply {
94 request_id: u64,
95 body: HostedRpcReplyBody,
96 },
97}
98
99#[derive(Debug, BinaryCodec)]
103pub enum HostedRpcReplyBody {
104 Ok { result_bytes: Vec<u8> },
105 Err { message: String },
106}
107
108#[derive(Debug, BinaryCodec)]
109pub enum SerializableTestResult {
110 Passed {
111 exec_time: Duration,
112 },
113 Benchmarked {
114 exec_time: Duration,
115 ns_iter_summ: Summary,
116 mb_s: usize,
117 },
118 Failed {
119 exec_time: Duration,
120 rendered_failure_cause: String,
121 },
122 Ignored,
123}
124
125impl SerializableTestResult {
126 pub fn into_test_result(
127 self,
128 stdout: Vec<CapturedOutput>,
129 stderr: Vec<CapturedOutput>,
130 ) -> TestResult {
131 let mut captured = [stdout, stderr].concat();
132 captured.sort();
133
134 let mut result: TestResult = self.into();
135 result.set_captured_output(captured);
136 result
137 }
138}
139
140impl From<&TestResult> for SerializableTestResult {
141 fn from(result: &TestResult) -> Self {
142 match &result {
143 TestResult::Passed { exec_time, .. } => SerializableTestResult::Passed {
144 exec_time: *exec_time,
145 },
146 TestResult::Benchmarked {
147 exec_time,
148 ns_iter_summ,
149 mb_s,
150 ..
151 } => SerializableTestResult::Benchmarked {
152 exec_time: *exec_time,
153 ns_iter_summ: *ns_iter_summ,
154 mb_s: *mb_s,
155 },
156 TestResult::Failed {
157 exec_time, cause, ..
158 } => SerializableTestResult::Failed {
159 exec_time: *exec_time,
160 rendered_failure_cause: cause.render(),
161 },
162 TestResult::Ignored { .. } => SerializableTestResult::Ignored,
163 }
164 }
165}
166
167impl From<SerializableTestResult> for TestResult {
168 fn from(result: SerializableTestResult) -> Self {
169 match result {
170 SerializableTestResult::Passed { exec_time } => TestResult::passed(exec_time),
171 SerializableTestResult::Failed {
172 exec_time,
173 rendered_failure_cause,
174 } => TestResult::failed(
175 exec_time,
176 FailureCause::HarnessError(rendered_failure_cause),
177 ),
178 SerializableTestResult::Ignored => TestResult::ignored(),
179 SerializableTestResult::Benchmarked {
180 exec_time,
181 ns_iter_summ,
182 mb_s,
183 } => TestResult::benchmarked(exec_time, ns_iter_summ, mb_s),
184 }
185 }
186}
187
188#[derive(Debug, BinaryCodec)]
191pub enum IpcResponse {
192 TestFinished {
193 result: SerializableTestResult,
194 finish_marker: String,
195 },
196 CloneableAccepted { dep_id: String },
199 HostedDescriptorAccepted { dep_id: String },
202 HostedRpcCall {
210 request_id: u64,
211 dep_id: String,
212 method_idx: u32,
213 args_bytes: Vec<u8>,
214 },
215}
216
217pub fn ipc_name<'s>(name: String) -> Name<'s> {
218 if GenericNamespaced::is_supported() {
219 name.to_ns_name::<GenericNamespaced>()
220 .expect("Invalid local socket name")
221 } else {
222 name.to_fs_name::<GenericFilePath>()
223 .expect("Invalid local socket name")
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use std::io::Cursor;
231
232 #[test]
233 fn write_then_read_round_trip_empty() {
234 let mut buf: Vec<u8> = Vec::new();
235 write_frame(&mut buf, &[]).expect("write");
236 let mut cursor = Cursor::new(&buf);
237 let payload = read_frame(&mut cursor).expect("read");
238 assert!(payload.is_empty());
239 }
240
241 #[test]
242 fn write_then_read_round_trip_small() {
243 let mut buf: Vec<u8> = Vec::new();
244 let data = b"hello, world";
245 write_frame(&mut buf, data).expect("write");
246 let mut cursor = Cursor::new(&buf);
247 let payload = read_frame(&mut cursor).expect("read");
248 assert_eq!(payload, data);
249 }
250
251 #[test]
252 fn write_then_read_round_trip_large_payload_exceeds_u16() {
253 let mut data = vec![0u8; 200 * 1024];
255 for (i, b) in data.iter_mut().enumerate() {
256 *b = (i % 251) as u8;
257 }
258 let mut buf: Vec<u8> = Vec::new();
259 write_frame(&mut buf, &data).expect("write");
260 assert_eq!(buf.len(), FRAME_LEN_BYTES + data.len());
261 let mut cursor = Cursor::new(&buf);
262 let payload = read_frame(&mut cursor).expect("read");
263 assert_eq!(payload.len(), data.len());
264 assert_eq!(payload, data);
265 }
266
267 #[test]
268 fn read_frame_propagates_eof() {
269 let buf: Vec<u8> = Vec::new();
270 let mut cursor = Cursor::new(&buf);
271 let err = read_frame(&mut cursor).expect_err("must fail on empty");
272 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
273 }
274
275 #[cfg(feature = "tokio")]
276 #[test]
277 fn async_round_trip_large_payload_exceeds_u16() {
278 let runtime = tokio::runtime::Runtime::new().unwrap();
279 runtime.block_on(async {
280 let mut data = vec![0u8; 200 * 1024];
281 for (i, b) in data.iter_mut().enumerate() {
282 *b = (i % 251) as u8;
283 }
284 let mut buf: Vec<u8> = Vec::new();
285 write_frame_async(&mut buf, &data).await.expect("write");
286 assert_eq!(buf.len(), FRAME_LEN_BYTES + data.len());
287 let mut slice: &[u8] = &buf;
288 let payload = read_frame_async(&mut slice).await.expect("read");
289 assert_eq!(payload, data);
290 });
291 }
292}