1use std::io::{Read, Write};
7
8use rkyv::{Archive, Deserialize, Serialize};
9
10use crate::error::{Error, Result};
11
12#[derive(Debug, Clone, Archive, Serialize, Deserialize)]
14pub enum WorkerCommand {
15 LoadCell {
17 dylib_path: String,
19 dep_count: usize,
21 entry_symbol: String,
23 name: String,
25 },
26
27 Execute {
29 inputs: Vec<Vec<u8>>,
31 widget_values_json: Vec<u8>,
34 },
35
36 Shutdown,
38
39 Ping,
41}
42
43#[derive(Debug, Clone, Archive, Serialize, Deserialize)]
45pub enum WorkerResponse {
46 Loaded,
48
49 Output {
51 bytes: Vec<u8>,
53 widgets_json: Vec<u8>,
56 },
57
58 Error {
60 message: String,
62 },
63
64 Panic {
66 message: String,
68 },
69
70 Pong,
72
73 ShuttingDown,
75}
76
77pub fn write_message<W: Write>(
79 writer: &mut W,
80 message: &impl for<'a> Serialize<
81 rkyv::rancor::Strategy<
82 rkyv::ser::Serializer<
83 rkyv::util::AlignedVec,
84 rkyv::ser::allocator::ArenaHandle<'a>,
85 rkyv::ser::sharing::Share,
86 >,
87 rkyv::rancor::Error,
88 >,
89 >,
90) -> Result<()> {
91 let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(message)
92 .map_err(|e| Error::Serialization(format!("Failed to encode IPC message: {}", e)))?;
93
94 let len = bytes.len() as u32;
95 writer
96 .write_all(&len.to_le_bytes())
97 .map_err(|e| Error::Ipc(format!("Failed to write IPC message length: {}", e)))?;
98 writer
99 .write_all(&bytes)
100 .map_err(|e| Error::Ipc(format!("Failed to write IPC message body: {}", e)))?;
101 writer
102 .flush()
103 .map_err(|e| Error::Ipc(format!("Failed to flush IPC stream: {}", e)))?;
104
105 Ok(())
106}
107
108pub fn read_message<R: Read, T>(reader: &mut R) -> Result<T>
115where
116 T: Archive,
117 T::Archived: Deserialize<T, rkyv::rancor::Strategy<rkyv::de::Pool, rkyv::rancor::Error>>,
118{
119 let mut len_bytes = [0u8; 4];
120 reader
121 .read_exact(&mut len_bytes)
122 .map_err(|e| Error::Ipc(format!("Failed to read IPC message length: {}", e)))?;
123 let len = u32::from_le_bytes(len_bytes) as usize;
124
125 if len > 100 * 1024 * 1024 {
127 return Err(Error::Ipc(format!(
128 "IPC message too large: {} bytes",
129 len
130 )));
131 }
132
133 let mut bytes = vec![0u8; len];
134 reader
135 .read_exact(&mut bytes)
136 .map_err(|e| Error::Ipc(format!("Failed to read IPC message body: {}", e)))?;
137
138 let message = unsafe { rkyv::from_bytes_unchecked::<T, rkyv::rancor::Error>(&bytes) }
141 .map_err(|e| Error::Serialization(format!("Failed to decode IPC message: {}", e)))?;
142
143 Ok(message)
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use std::io::Cursor;
150
151 #[test]
152 fn test_command_roundtrip() {
153 let cmd = WorkerCommand::LoadCell {
154 dylib_path: "/tmp/cell.so".to_string(),
155 dep_count: 2,
156 entry_symbol: "venus_entry_my_cell".to_string(),
157 name: "my_cell".to_string(),
158 };
159
160 let mut buf = Vec::new();
161 write_message(&mut buf, &cmd).unwrap();
162
163 let mut cursor = Cursor::new(buf);
164 let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
165
166 match decoded {
167 WorkerCommand::LoadCell {
168 dylib_path,
169 dep_count,
170 entry_symbol,
171 name,
172 } => {
173 assert_eq!(dylib_path, "/tmp/cell.so");
174 assert_eq!(dep_count, 2);
175 assert_eq!(entry_symbol, "venus_entry_my_cell");
176 assert_eq!(name, "my_cell");
177 }
178 _ => panic!("Wrong command type"),
179 }
180 }
181
182 #[test]
183 fn test_response_roundtrip() {
184 let resp = WorkerResponse::Output {
185 bytes: vec![1, 2, 3, 4, 5],
186 widgets_json: vec![],
187 };
188
189 let mut buf = Vec::new();
190 write_message(&mut buf, &resp).unwrap();
191
192 let mut cursor = Cursor::new(buf);
193 let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
194
195 match decoded {
196 WorkerResponse::Output { bytes, widgets_json } => {
197 assert_eq!(bytes, vec![1, 2, 3, 4, 5]);
198 assert!(widgets_json.is_empty());
199 }
200 _ => panic!("Wrong response type"),
201 }
202 }
203
204 #[test]
205 fn test_execute_command_roundtrip() {
206 let cmd = WorkerCommand::Execute {
207 inputs: vec![vec![1, 2, 3], vec![4, 5, 6]],
208 widget_values_json: vec![],
209 };
210
211 let mut buf = Vec::new();
212 write_message(&mut buf, &cmd).unwrap();
213
214 let mut cursor = Cursor::new(buf);
215 let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
216
217 match decoded {
218 WorkerCommand::Execute { inputs, widget_values_json } => {
219 assert_eq!(inputs.len(), 2);
220 assert_eq!(inputs[0], vec![1, 2, 3]);
221 assert_eq!(inputs[1], vec![4, 5, 6]);
222 assert!(widget_values_json.is_empty());
223 }
224 _ => panic!("Wrong command type"),
225 }
226 }
227
228 #[test]
229 fn test_empty_execute_command() {
230 let cmd = WorkerCommand::Execute {
232 inputs: vec![],
233 widget_values_json: vec![],
234 };
235
236 let mut buf = Vec::new();
237 write_message(&mut buf, &cmd).unwrap();
238 eprintln!("Empty Execute command serializes to {} bytes", buf.len());
239
240 let mut cursor = Cursor::new(buf);
241 let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
242
243 match decoded {
244 WorkerCommand::Execute { inputs, widget_values_json } => {
245 assert!(inputs.is_empty());
246 assert!(widget_values_json.is_empty());
247 }
248 _ => panic!("Wrong command type"),
249 }
250 }
251
252 #[test]
253 fn test_loaded_response_size() {
254 let response = WorkerResponse::Loaded;
255
256 let mut buf = Vec::new();
257 write_message(&mut buf, &response).unwrap();
258 eprintln!("Loaded response serializes to {} bytes total ({} payload)",
259 buf.len(), buf.len() - 4);
260
261 let mut cursor = Cursor::new(buf);
262 let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
263
264 matches!(decoded, WorkerResponse::Loaded);
265 }
266
267 #[test]
268 fn test_error_response_roundtrip() {
269 let resp = WorkerResponse::Error {
270 message: "Division by zero".to_string(),
271 };
272
273 let mut buf = Vec::new();
274 write_message(&mut buf, &resp).unwrap();
275
276 let mut cursor = Cursor::new(buf);
277 let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
278
279 match decoded {
280 WorkerResponse::Error { message } => {
281 assert_eq!(message, "Division by zero");
282 }
283 _ => panic!("Wrong response type"),
284 }
285 }
286
287 #[test]
288 fn test_panic_response_roundtrip() {
289 let resp = WorkerResponse::Panic {
290 message: "thread 'main' panicked at 'assertion failed'".to_string(),
291 };
292
293 let mut buf = Vec::new();
294 write_message(&mut buf, &resp).unwrap();
295
296 let mut cursor = Cursor::new(buf);
297 let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
298
299 match decoded {
300 WorkerResponse::Panic { message } => {
301 assert!(message.contains("panicked"));
302 }
303 _ => panic!("Wrong response type"),
304 }
305 }
306
307 #[test]
308 fn test_shutdown_command() {
309 let cmd = WorkerCommand::Shutdown;
310
311 let mut buf = Vec::new();
312 write_message(&mut buf, &cmd).unwrap();
313
314 let mut cursor = Cursor::new(buf);
315 let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
316
317 matches!(decoded, WorkerCommand::Shutdown);
318 }
319
320 #[test]
321 fn test_shutting_down_response() {
322 let resp = WorkerResponse::ShuttingDown;
323
324 let mut buf = Vec::new();
325 write_message(&mut buf, &resp).unwrap();
326
327 let mut cursor = Cursor::new(buf);
328 let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
329
330 matches!(decoded, WorkerResponse::ShuttingDown);
331 }
332
333 #[test]
334 fn test_ping_pong() {
335 let cmd = WorkerCommand::Ping;
336
337 let mut cmd_buf = Vec::new();
338 write_message(&mut cmd_buf, &cmd).unwrap();
339
340 let mut cursor = Cursor::new(cmd_buf);
341 let decoded_cmd: WorkerCommand = read_message(&mut cursor).unwrap();
342 assert!(matches!(decoded_cmd, WorkerCommand::Ping));
343
344 let resp = WorkerResponse::Pong;
345
346 let mut resp_buf = Vec::new();
347 write_message(&mut resp_buf, &resp).unwrap();
348
349 let mut cursor = Cursor::new(resp_buf);
350 let decoded_resp: WorkerResponse = read_message(&mut cursor).unwrap();
351 assert!(matches!(decoded_resp, WorkerResponse::Pong));
352 }
353
354 #[test]
355 fn test_output_with_widgets() {
356 let resp = WorkerResponse::Output {
357 bytes: vec![1, 2, 3, 4, 5],
358 widgets_json: b"{\"slider_1\": {\"type\": \"slider\", \"value\": 50}}".to_vec(),
359 };
360
361 let mut buf = Vec::new();
362 write_message(&mut buf, &resp).unwrap();
363
364 let mut cursor = Cursor::new(buf);
365 let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
366
367 match decoded {
368 WorkerResponse::Output { bytes, widgets_json } => {
369 assert_eq!(bytes, vec![1, 2, 3, 4, 5]);
370 assert!(!widgets_json.is_empty());
371 assert!(std::str::from_utf8(&widgets_json).unwrap().contains("slider"));
372 }
373 _ => panic!("Wrong response type"),
374 }
375 }
376
377 #[test]
378 fn test_execute_with_widget_values() {
379 let cmd = WorkerCommand::Execute {
380 inputs: vec![vec![1, 2, 3]],
381 widget_values_json: b"{\"slider_1\": 75}".to_vec(),
382 };
383
384 let mut buf = Vec::new();
385 write_message(&mut buf, &cmd).unwrap();
386
387 let mut cursor = Cursor::new(buf);
388 let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
389
390 match decoded {
391 WorkerCommand::Execute { inputs, widget_values_json } => {
392 assert_eq!(inputs.len(), 1);
393 assert!(!widget_values_json.is_empty());
394 assert!(std::str::from_utf8(&widget_values_json).unwrap().contains("75"));
395 }
396 _ => panic!("Wrong command type"),
397 }
398 }
399
400 #[test]
401 fn test_large_output() {
402 let large_bytes: Vec<u8> = (0..1_000_000).map(|i| (i % 256) as u8).collect();
404
405 let resp = WorkerResponse::Output {
406 bytes: large_bytes.clone(),
407 widgets_json: vec![],
408 };
409
410 let mut buf = Vec::new();
411 write_message(&mut buf, &resp).unwrap();
412
413 let mut cursor = Cursor::new(buf);
414 let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
415
416 match decoded {
417 WorkerResponse::Output { bytes, .. } => {
418 assert_eq!(bytes.len(), 1_000_000);
419 assert_eq!(bytes[0], 0);
420 assert_eq!(bytes[255], 255);
421 assert_eq!(bytes[999_999], 63); }
423 _ => panic!("Wrong response type"),
424 }
425 }
426
427 #[test]
428 fn test_empty_error_message() {
429 let resp = WorkerResponse::Error {
431 message: String::new(),
432 };
433
434 let mut buf = Vec::new();
435 write_message(&mut buf, &resp).unwrap();
436
437 let mut cursor = Cursor::new(buf);
438 let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
439
440 match decoded {
441 WorkerResponse::Error { message } => {
442 assert!(message.is_empty());
443 }
444 _ => panic!("Wrong response type"),
445 }
446 }
447
448 #[test]
449 fn test_unicode_in_messages() {
450 let cmd = WorkerCommand::LoadCell {
451 dylib_path: "/tmp/测试_cell.so".to_string(),
452 dep_count: 0,
453 entry_symbol: "entry_测试".to_string(),
454 name: "测试_cell_🚀".to_string(),
455 };
456
457 let mut buf = Vec::new();
458 write_message(&mut buf, &cmd).unwrap();
459
460 let mut cursor = Cursor::new(buf);
461 let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
462
463 match decoded {
464 WorkerCommand::LoadCell { dylib_path, name, .. } => {
465 assert!(dylib_path.contains("测试"));
466 assert!(name.contains("🚀"));
467 }
468 _ => panic!("Wrong command type"),
469 }
470 }
471}