venus_core/ipc/
protocol.rs

1//! IPC protocol messages for Venus worker processes.
2//!
3//! Uses length-prefixed rkyv messages over stdin/stdout.
4//! Format: 4-byte length (u32 LE) + rkyv-encoded message.
5
6use std::io::{Read, Write};
7
8use rkyv::{Archive, Deserialize, Serialize};
9
10use crate::error::{Error, Result};
11
12/// Command sent from parent to worker process.
13#[derive(Debug, Clone, Archive, Serialize, Deserialize)]
14pub enum WorkerCommand {
15    /// Load a compiled cell's dynamic library.
16    LoadCell {
17        /// Path to the dylib file.
18        dylib_path: String,
19        /// Number of dependencies for FFI dispatch.
20        dep_count: usize,
21        /// Entry point symbol name.
22        entry_symbol: String,
23        /// Cell name for error reporting.
24        name: String,
25    },
26
27    /// Execute the loaded cell with given inputs.
28    Execute {
29        /// Serialized inputs (rkyv bytes for each dependency).
30        inputs: Vec<Vec<u8>>,
31        /// Widget values as JSON (widget_id -> value).
32        /// Empty if no widgets.
33        widget_values_json: Vec<u8>,
34    },
35
36    /// Shutdown the worker process gracefully.
37    Shutdown,
38
39    /// Ping to check if worker is alive.
40    Ping,
41}
42
43/// Response sent from worker to parent process.
44#[derive(Debug, Clone, Archive, Serialize, Deserialize)]
45pub enum WorkerResponse {
46    /// Cell loaded successfully.
47    Loaded,
48
49    /// Execution completed successfully with output.
50    Output {
51        /// Serialized output bytes (display_len + display + rkyv data).
52        bytes: Vec<u8>,
53        /// Widget definitions as JSON.
54        /// Empty if no widgets were registered.
55        widgets_json: Vec<u8>,
56    },
57
58    /// Execution failed with an error.
59    Error {
60        /// Error message.
61        message: String,
62    },
63
64    /// Worker panicked during execution.
65    Panic {
66        /// Panic message if available.
67        message: String,
68    },
69
70    /// Response to Ping command.
71    Pong,
72
73    /// Acknowledgement of shutdown request.
74    ShuttingDown,
75}
76
77/// Write a message to a writer using length-prefixed rkyv encoding.
78pub fn write_message<W: Write>(
79    writer: &mut W,
80    message: &impl for<'a> Serialize<
81        rkyv::rancor::Strategy<
82            rkyv::ser::Serializer<
83                rkyv::util::AlignedVec,
84                rkyv::ser::allocator::ArenaHandle<'a>,
85                rkyv::ser::sharing::Share,
86            >,
87            rkyv::rancor::Error,
88        >,
89    >,
90) -> Result<()> {
91    let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(message)
92        .map_err(|e| Error::Serialization(format!("Failed to encode IPC message: {}", e)))?;
93
94    let len = bytes.len() as u32;
95    writer
96        .write_all(&len.to_le_bytes())
97        .map_err(|e| Error::Ipc(format!("Failed to write IPC message length: {}", e)))?;
98    writer
99        .write_all(&bytes)
100        .map_err(|e| Error::Ipc(format!("Failed to write IPC message body: {}", e)))?;
101    writer
102        .flush()
103        .map_err(|e| Error::Ipc(format!("Failed to flush IPC stream: {}", e)))?;
104
105    Ok(())
106}
107
108/// Read a message from a reader using length-prefixed rkyv encoding.
109///
110/// # Safety
111///
112/// Uses unchecked deserialization for performance. Only safe when reading from
113/// trusted sources (our own worker processes or state files).
114pub fn read_message<R: Read, T>(reader: &mut R) -> Result<T>
115where
116    T: Archive,
117    T::Archived: Deserialize<T, rkyv::rancor::Strategy<rkyv::de::Pool, rkyv::rancor::Error>>,
118{
119    let mut len_bytes = [0u8; 4];
120    reader
121        .read_exact(&mut len_bytes)
122        .map_err(|e| Error::Ipc(format!("Failed to read IPC message length: {}", e)))?;
123    let len = u32::from_le_bytes(len_bytes) as usize;
124
125    // Sanity check: reject absurdly large messages (100MB)
126    if len > 100 * 1024 * 1024 {
127        return Err(Error::Ipc(format!(
128            "IPC message too large: {} bytes",
129            len
130        )));
131    }
132
133    let mut bytes = vec![0u8; len];
134    reader
135        .read_exact(&mut bytes)
136        .map_err(|e| Error::Ipc(format!("Failed to read IPC message body: {}", e)))?;
137
138    // SAFETY: We trust data from our own worker processes and state files.
139    // Using unchecked deserialization avoids CheckBytes trait complexity.
140    let message = unsafe { rkyv::from_bytes_unchecked::<T, rkyv::rancor::Error>(&bytes) }
141        .map_err(|e| Error::Serialization(format!("Failed to decode IPC message: {}", e)))?;
142
143    Ok(message)
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use std::io::Cursor;
150
151    #[test]
152    fn test_command_roundtrip() {
153        let cmd = WorkerCommand::LoadCell {
154            dylib_path: "/tmp/cell.so".to_string(),
155            dep_count: 2,
156            entry_symbol: "venus_entry_my_cell".to_string(),
157            name: "my_cell".to_string(),
158        };
159
160        let mut buf = Vec::new();
161        write_message(&mut buf, &cmd).unwrap();
162
163        let mut cursor = Cursor::new(buf);
164        let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
165
166        match decoded {
167            WorkerCommand::LoadCell {
168                dylib_path,
169                dep_count,
170                entry_symbol,
171                name,
172            } => {
173                assert_eq!(dylib_path, "/tmp/cell.so");
174                assert_eq!(dep_count, 2);
175                assert_eq!(entry_symbol, "venus_entry_my_cell");
176                assert_eq!(name, "my_cell");
177            }
178            _ => panic!("Wrong command type"),
179        }
180    }
181
182    #[test]
183    fn test_response_roundtrip() {
184        let resp = WorkerResponse::Output {
185            bytes: vec![1, 2, 3, 4, 5],
186            widgets_json: vec![],
187        };
188
189        let mut buf = Vec::new();
190        write_message(&mut buf, &resp).unwrap();
191
192        let mut cursor = Cursor::new(buf);
193        let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
194
195        match decoded {
196            WorkerResponse::Output { bytes, widgets_json } => {
197                assert_eq!(bytes, vec![1, 2, 3, 4, 5]);
198                assert!(widgets_json.is_empty());
199            }
200            _ => panic!("Wrong response type"),
201        }
202    }
203
204    #[test]
205    fn test_execute_command_roundtrip() {
206        let cmd = WorkerCommand::Execute {
207            inputs: vec![vec![1, 2, 3], vec![4, 5, 6]],
208            widget_values_json: vec![],
209        };
210
211        let mut buf = Vec::new();
212        write_message(&mut buf, &cmd).unwrap();
213
214        let mut cursor = Cursor::new(buf);
215        let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
216
217        match decoded {
218            WorkerCommand::Execute { inputs, widget_values_json } => {
219                assert_eq!(inputs.len(), 2);
220                assert_eq!(inputs[0], vec![1, 2, 3]);
221                assert_eq!(inputs[1], vec![4, 5, 6]);
222                assert!(widget_values_json.is_empty());
223            }
224            _ => panic!("Wrong command type"),
225        }
226    }
227
228    #[test]
229    fn test_empty_execute_command() {
230        // This tests the case that's failing in process_isolation tests
231        let cmd = WorkerCommand::Execute {
232            inputs: vec![],
233            widget_values_json: vec![],
234        };
235
236        let mut buf = Vec::new();
237        write_message(&mut buf, &cmd).unwrap();
238        eprintln!("Empty Execute command serializes to {} bytes", buf.len());
239
240        let mut cursor = Cursor::new(buf);
241        let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
242
243        match decoded {
244            WorkerCommand::Execute { inputs, widget_values_json } => {
245                assert!(inputs.is_empty());
246                assert!(widget_values_json.is_empty());
247            }
248            _ => panic!("Wrong command type"),
249        }
250    }
251
252    #[test]
253    fn test_loaded_response_size() {
254        let response = WorkerResponse::Loaded;
255
256        let mut buf = Vec::new();
257        write_message(&mut buf, &response).unwrap();
258        eprintln!("Loaded response serializes to {} bytes total ({} payload)",
259                  buf.len(), buf.len() - 4);
260
261        let mut cursor = Cursor::new(buf);
262        let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
263
264        matches!(decoded, WorkerResponse::Loaded);
265    }
266
267    #[test]
268    fn test_error_response_roundtrip() {
269        let resp = WorkerResponse::Error {
270            message: "Division by zero".to_string(),
271        };
272
273        let mut buf = Vec::new();
274        write_message(&mut buf, &resp).unwrap();
275
276        let mut cursor = Cursor::new(buf);
277        let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
278
279        match decoded {
280            WorkerResponse::Error { message } => {
281                assert_eq!(message, "Division by zero");
282            }
283            _ => panic!("Wrong response type"),
284        }
285    }
286
287    #[test]
288    fn test_panic_response_roundtrip() {
289        let resp = WorkerResponse::Panic {
290            message: "thread 'main' panicked at 'assertion failed'".to_string(),
291        };
292
293        let mut buf = Vec::new();
294        write_message(&mut buf, &resp).unwrap();
295
296        let mut cursor = Cursor::new(buf);
297        let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
298
299        match decoded {
300            WorkerResponse::Panic { message } => {
301                assert!(message.contains("panicked"));
302            }
303            _ => panic!("Wrong response type"),
304        }
305    }
306
307    #[test]
308    fn test_shutdown_command() {
309        let cmd = WorkerCommand::Shutdown;
310
311        let mut buf = Vec::new();
312        write_message(&mut buf, &cmd).unwrap();
313
314        let mut cursor = Cursor::new(buf);
315        let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
316
317        matches!(decoded, WorkerCommand::Shutdown);
318    }
319
320    #[test]
321    fn test_shutting_down_response() {
322        let resp = WorkerResponse::ShuttingDown;
323
324        let mut buf = Vec::new();
325        write_message(&mut buf, &resp).unwrap();
326
327        let mut cursor = Cursor::new(buf);
328        let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
329
330        matches!(decoded, WorkerResponse::ShuttingDown);
331    }
332
333    #[test]
334    fn test_ping_pong() {
335        let cmd = WorkerCommand::Ping;
336
337        let mut cmd_buf = Vec::new();
338        write_message(&mut cmd_buf, &cmd).unwrap();
339
340        let mut cursor = Cursor::new(cmd_buf);
341        let decoded_cmd: WorkerCommand = read_message(&mut cursor).unwrap();
342        assert!(matches!(decoded_cmd, WorkerCommand::Ping));
343
344        let resp = WorkerResponse::Pong;
345
346        let mut resp_buf = Vec::new();
347        write_message(&mut resp_buf, &resp).unwrap();
348
349        let mut cursor = Cursor::new(resp_buf);
350        let decoded_resp: WorkerResponse = read_message(&mut cursor).unwrap();
351        assert!(matches!(decoded_resp, WorkerResponse::Pong));
352    }
353
354    #[test]
355    fn test_output_with_widgets() {
356        let resp = WorkerResponse::Output {
357            bytes: vec![1, 2, 3, 4, 5],
358            widgets_json: b"{\"slider_1\": {\"type\": \"slider\", \"value\": 50}}".to_vec(),
359        };
360
361        let mut buf = Vec::new();
362        write_message(&mut buf, &resp).unwrap();
363
364        let mut cursor = Cursor::new(buf);
365        let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
366
367        match decoded {
368            WorkerResponse::Output { bytes, widgets_json } => {
369                assert_eq!(bytes, vec![1, 2, 3, 4, 5]);
370                assert!(!widgets_json.is_empty());
371                assert!(std::str::from_utf8(&widgets_json).unwrap().contains("slider"));
372            }
373            _ => panic!("Wrong response type"),
374        }
375    }
376
377    #[test]
378    fn test_execute_with_widget_values() {
379        let cmd = WorkerCommand::Execute {
380            inputs: vec![vec![1, 2, 3]],
381            widget_values_json: b"{\"slider_1\": 75}".to_vec(),
382        };
383
384        let mut buf = Vec::new();
385        write_message(&mut buf, &cmd).unwrap();
386
387        let mut cursor = Cursor::new(buf);
388        let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
389
390        match decoded {
391            WorkerCommand::Execute { inputs, widget_values_json } => {
392                assert_eq!(inputs.len(), 1);
393                assert!(!widget_values_json.is_empty());
394                assert!(std::str::from_utf8(&widget_values_json).unwrap().contains("75"));
395            }
396            _ => panic!("Wrong command type"),
397        }
398    }
399
400    #[test]
401    fn test_large_output() {
402        // Test with a large output (1MB)
403        let large_bytes: Vec<u8> = (0..1_000_000).map(|i| (i % 256) as u8).collect();
404
405        let resp = WorkerResponse::Output {
406            bytes: large_bytes.clone(),
407            widgets_json: vec![],
408        };
409
410        let mut buf = Vec::new();
411        write_message(&mut buf, &resp).unwrap();
412
413        let mut cursor = Cursor::new(buf);
414        let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
415
416        match decoded {
417            WorkerResponse::Output { bytes, .. } => {
418                assert_eq!(bytes.len(), 1_000_000);
419                assert_eq!(bytes[0], 0);
420                assert_eq!(bytes[255], 255);
421                assert_eq!(bytes[999_999], 63); // 999_999 % 256 = 63
422            }
423            _ => panic!("Wrong response type"),
424        }
425    }
426
427    #[test]
428    fn test_empty_error_message() {
429        // Edge case: error with empty message
430        let resp = WorkerResponse::Error {
431            message: String::new(),
432        };
433
434        let mut buf = Vec::new();
435        write_message(&mut buf, &resp).unwrap();
436
437        let mut cursor = Cursor::new(buf);
438        let decoded: WorkerResponse = read_message(&mut cursor).unwrap();
439
440        match decoded {
441            WorkerResponse::Error { message } => {
442                assert!(message.is_empty());
443            }
444            _ => panic!("Wrong response type"),
445        }
446    }
447
448    #[test]
449    fn test_unicode_in_messages() {
450        let cmd = WorkerCommand::LoadCell {
451            dylib_path: "/tmp/测试_cell.so".to_string(),
452            dep_count: 0,
453            entry_symbol: "entry_测试".to_string(),
454            name: "测试_cell_🚀".to_string(),
455        };
456
457        let mut buf = Vec::new();
458        write_message(&mut buf, &cmd).unwrap();
459
460        let mut cursor = Cursor::new(buf);
461        let decoded: WorkerCommand = read_message(&mut cursor).unwrap();
462
463        match decoded {
464            WorkerCommand::LoadCell { dylib_path, name, .. } => {
465                assert!(dylib_path.contains("测试"));
466                assert!(name.contains("🚀"));
467            }
468            _ => panic!("Wrong command type"),
469        }
470    }
471}