1use interprocess::unnamed_pipe::{Recver, Sender};
10use nix::errno::Errno;
11use nix::fcntl::OFlag;
12use nix::sys::signal::{self, Signal};
13use nix::unistd::{fork, pipe2, ForkResult, Pid};
14use serde::{Deserialize, Serialize};
15use std::io::prelude::*;
16use std::path::PathBuf;
17use std::sync::mpsc;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, SystemTime};
20use std::{fmt, mem};
21use thiserror::Error;
22
23pub mod sys {
25 pub use nix::sys::signal::{self, Signal};
26 pub use nix::sys::wait::{waitpid, WaitStatus};
27 pub use nix::unistd::{fork, getpid, ForkResult, Pid};
28}
29
30pub mod ser {
32 use bincode::{deserialize, serialize, Error};
33 use serde::{Deserialize, Serialize};
34 pub fn to_vec<T: Serialize>(value: &T) -> Result<Vec<u8>, Error> {
35 serialize(value)
36 }
37
38 pub fn from_slice<'de, T: Deserialize<'de>>(bytes: &'de [u8]) -> Result<T, Error> {
39 let val = deserialize(bytes)?;
40 Ok(val)
41 }
42}
43
44#[derive(Debug)]
46pub struct ProcessWrapper<T> {
47 child_pid: Pid,
48 start_time: Option<SystemTime>,
49 receiver: Option<Recver>,
50 result: Arc<Mutex<Option<Vec<u8>>>>,
51 _ghost: std::marker::PhantomData<T>,
52}
53
54impl<T> fmt::Display for ProcessWrapper<T> {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 write!(f, "Process(pid={})", self.child_pid)
57 }
58}
59
60impl<T> ProcessWrapper<T>
61where
62 T: serde::de::DeserializeOwned,
63{
64 pub fn new(child_pid: Pid, receiver: Recver) -> Self {
66 Self {
67 child_pid,
68 start_time: None,
69 receiver: Some(receiver),
70 result: Arc::new(Mutex::new(None)),
71 _ghost: std::marker::PhantomData,
72 }
73 }
74
75 pub fn wait(&mut self) -> Result<T, ProcessFunError> {
77 self.ensure_start_time()?;
79
80 if let Some(bytes) = self.result.lock().unwrap().take() {
82 return ser::from_slice(&bytes).map_err(ProcessFunError::from);
83 }
84
85 let receiver = self.receiver.take().ok_or_else(|| {
87 ProcessFunError::ProcessError("Process already completed".to_string())
88 })?;
89
90 let mut receiver = receiver;
91 let result_bytes = read_from_pipe(&mut receiver)?;
92 let result: T = ser::from_slice(&result_bytes)?;
93
94 Ok(result)
95 }
96
97 pub fn timeout(&mut self, duration: Duration) -> Result<T, ProcessFunError> {
99 self.ensure_start_time()?;
101
102 let receiver = self.receiver.take().ok_or_else(|| {
104 ProcessFunError::ProcessError("Process already completed".to_string())
105 })?;
106
107 let (tx, rx) = mpsc::channel();
109
110 let result = self.result.clone();
112 std::thread::spawn(move || {
113 let mut receiver = receiver;
114 if let Ok(bytes) = read_from_pipe(&mut receiver) {
115 *result.lock().unwrap() = Some(bytes);
116 let _ = tx.send(true); }
118 });
119
120 match rx.recv_timeout(duration) {
122 Ok(_) => {
123 if let Some(bytes) = self.result.lock().unwrap().take() {
125 return ser::from_slice(&bytes).map_err(ProcessFunError::from);
126 }
127 Err(ProcessFunError::ProcessError(
129 "Process result not found".to_string(),
130 ))
131 }
132 Err(_) => {
133 self.abort()?;
135 Err(ProcessFunError::TimeoutError)
136 }
137 }
138 }
139}
140
141#[inline]
142pub fn stat_pid_start(pid: Pid) -> Result<SystemTime, ProcessFunError> {
143 let proc_path = format!("/proc/{}/stat", pid.as_raw());
144 nix::sys::stat::stat(proc_path.as_str())
145 .map_err(|e| ProcessFunError::ProcessError(format!("Failed to stat process: {}", e)))
146 .and_then(|stat| {
147 SystemTime::UNIX_EPOCH
148 .checked_add(Duration::from_secs(stat.st_ctime as u64))
149 .ok_or_else(|| {
150 ProcessFunError::ProcessError(
151 "Failed to calculate process start time".to_string(),
152 )
153 })
154 })
155}
156
157impl<T> ProcessWrapper<T> {
158 #[inline]
160 fn ensure_start_time(&mut self) -> Result<(), ProcessFunError> {
161 if self.start_time.is_some() {
162 return Ok(());
163 }
164
165 if let Some(receiver) = &mut self.receiver {
166 let start_time = read_start_time_from_pipe(receiver)?;
167 self.start_time = Some(start_time);
168 Ok(())
169 } else {
170 Err(ProcessFunError::ProcessError(
171 "Process already completed".to_string(),
172 ))
173 }
174 }
175
176 #[inline]
178 fn is_same_process(&mut self) -> bool {
179 if self.ensure_start_time().is_err() {
180 return false;
181 }
182 if let Some(start_time) = self.start_time {
184 stat_pid_start(self.child_pid)
185 .map(|stat| stat == start_time)
186 .unwrap_or(false)
187 } else {
188 false
189 }
190 }
191
192 #[inline]
193 fn kill(&mut self) -> Result<(), Errno> {
194 if self.is_same_process() {
196 match signal::kill(self.child_pid, Signal::SIGKILL) {
197 Ok(()) => Ok(()),
198 Err(Errno::ESRCH) => Ok(()), Err(e) => Err(e),
200 }
201 } else {
202 Ok(()) }
204 }
205
206 pub fn abort(&mut self) -> Result<(), ProcessFunError> {
208 let _ = self.receiver.take();
210
211 self.kill().map_err(|e| {
212 ProcessFunError::ProcessError(format!("Failed to send SIGKILL to process: {}", e))
213 })?;
214 Ok(())
215 }
216}
217
218impl<T> Drop for ProcessWrapper<T> {
219 fn drop(&mut self) {
220 let _ = self.receiver.take();
222
223 let _ = self.kill();
225 }
226}
227
228#[inline]
230pub fn create_pipes() -> Result<(Recver, Sender), ProcessFunError> {
231 #[cfg(feature = "debug")]
232 eprintln!("[process-fun-debug] Creating communication pipes");
233
234 let (read_fd, write_fd) = pipe2(OFlag::O_CLOEXEC)
236 .map_err(|e| ProcessFunError::ProcessError(format!("Failed to create pipe: {}", e)))?;
237
238 let recver = Recver::from(read_fd);
240 let sender = Sender::from(write_fd);
241
242 #[cfg(feature = "debug")]
243 eprintln!("[process-fun-debug] Pipes created successfully");
244
245 Ok((recver, sender))
246}
247
248const SYSTEM_TIME_SIZE: usize = mem::size_of::<SystemTime>();
249
250#[inline]
251fn system_time_to_bytes_unsafe(time: SystemTime) -> [u8; SYSTEM_TIME_SIZE] {
252 unsafe { mem::transmute::<SystemTime, [u8; SYSTEM_TIME_SIZE]>(time) }
253}
254
255#[inline]
256fn bytes_to_system_time_unsafe(bytes: [u8; SYSTEM_TIME_SIZE]) -> SystemTime {
257 unsafe { mem::transmute::<[u8; SYSTEM_TIME_SIZE], SystemTime>(bytes) }
258}
259
260#[inline]
262pub fn write_time(fd: &mut Sender, time: SystemTime) -> Result<(), ProcessFunError> {
263 #[cfg(feature = "debug")]
264 eprintln!("[process-fun-debug] Writing start time to pipe");
265
266 let time_bytes = system_time_to_bytes_unsafe(time);
267 fd.write_all(&time_bytes)?;
268
269 #[cfg(feature = "debug")]
270 eprintln!("[process-fun-debug] Successfully wrote start time to pipe");
271
272 Ok(())
273}
274
275#[inline]
277pub fn write_to_pipe(mut fd: Sender, data: &[u8]) -> Result<(), ProcessFunError> {
278 #[cfg(feature = "debug")]
279 eprintln!("[process-fun-debug] Writing {} bytes to pipe", data.len());
280
281 fd.write_all(data)
282 .map_err(|e| ProcessFunError::ProcessError(format!("Failed to write to pipe: {}", e)))?;
283
284 #[cfg(feature = "debug")]
286 eprintln!("[process-fun-debug] Successfully wrote data to pipe");
287
288 Ok(())
289}
290
291#[inline]
293pub fn read_start_time_from_pipe(fd: &mut Recver) -> Result<SystemTime, ProcessFunError> {
294 #[cfg(feature = "debug")]
295 eprintln!("[process-fun-debug] Reading start time from pipe");
296
297 let mut buffer = [0u8; SYSTEM_TIME_SIZE];
298 fd.read_exact(&mut buffer)?;
299 let start_time: SystemTime = bytes_to_system_time_unsafe(buffer);
300
301 #[cfg(feature = "debug")]
302 eprintln!("[process-fun-debug] Read start time from pipe");
303
304 Ok(start_time)
305}
306
307#[inline]
309pub fn read_from_pipe(fd: &mut Recver) -> Result<Vec<u8>, ProcessFunError> {
310 #[cfg(feature = "debug")]
311 eprintln!("[process-fun-debug] Starting to read from pipe");
312
313 let mut buffer = vec![];
314 #[allow(unused_variables)]
315 let bytes_read = fd
316 .read_to_end(&mut buffer)
317 .map_err(|e| ProcessFunError::ProcessError(format!("Failed to read from pipe: {}", e)))?;
318
319 #[cfg(feature = "debug")]
320 eprintln!("[process-fun-debug] Read {} bytes from pipe", bytes_read);
321
322 Ok(buffer)
323}
324
325#[inline]
327pub fn fork_process() -> Result<ForkResult, ProcessFunError> {
328 #[cfg(feature = "debug")]
329 eprintln!("[process-fun-debug] Forking process");
330
331 let result = unsafe {
332 fork().map_err(|e| ProcessFunError::ProcessError(format!("Failed to fork process: {}", e)))
333 };
334
335 #[cfg(feature = "debug")]
336 if let Ok(fork_result) = &result {
337 match fork_result {
338 ForkResult::Parent { child } => {
339 eprintln!(
340 "[process-fun-debug] Fork successful - parent process, child pid: {}",
341 child
342 );
343 }
344 ForkResult::Child => {
345 eprintln!("[process-fun-debug] Fork successful - child process");
346 }
347 }
348 }
349
350 result
351}
352
353pub type FunId = PathBuf;
355
356#[derive(Error, Debug, Serialize, Deserialize)]
358pub enum ProcessFunError {
359 #[error("Multiple #[process] attributes found for function '{fun}'")]
362 MultipleTags { fun: FunId },
363
364 #[error("Expected #[process] attribute only on function with implementation but found '{item_text}'")]
367 BadItemType { item_text: String },
368
369 #[error("Failed to read or write file: {0}")]
371 IoError(String),
372
373 #[error("Failed to parse Rust file: {0}")]
375 ParseError(String),
376
377 #[error("Process communication error: {0}")]
379 ProcessError(String),
380
381 #[error("Failed to serialize or deserialize: {0}")]
383 SerError(String),
384
385 #[error("Process execution timed out")]
387 TimeoutError,
388}
389
390impl From<bincode::Error> for ProcessFunError {
391 fn from(err: bincode::Error) -> Self {
392 ProcessFunError::SerError(err.to_string())
393 }
394}
395
396impl From<std::io::Error> for ProcessFunError {
397 fn from(err: std::io::Error) -> Self {
398 ProcessFunError::IoError(err.to_string())
399 }
400}
401
402impl From<syn::Error> for ProcessFunError {
403 fn from(err: syn::Error) -> Self {
404 ProcessFunError::ParseError(err.to_string())
405 }
406}