process_fun_core/
lib.rs

1//! # process-fun-core
2//!
3//! Core functionality for the process-fun library. This crate provides the fundamental types
4//! and functions needed to support out-of-process function execution.
5//!
6//! This crate is not meant to be used directly - instead, use the `process-fun` crate
7//! which provides a more ergonomic API.
8
9use interprocess::unnamed_pipe::{Recver, Sender};
10use nix::errno::Errno;
11use nix::fcntl::OFlag;
12use nix::sys::signal::{self, Signal};
13use nix::unistd::{fork, pipe2, ForkResult, Pid};
14use serde::{Deserialize, Serialize};
15use std::io::prelude::*;
16use std::path::PathBuf;
17use std::sync::mpsc;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, SystemTime};
20use std::{fmt, mem};
21use thiserror::Error;
22
23// Re-export specific items needed by generated code with clear namespacing
24pub mod sys {
25    pub use nix::sys::signal::{self, Signal};
26    pub use nix::sys::wait::{waitpid, WaitStatus};
27    pub use nix::unistd::{fork, getpid, ForkResult, Pid};
28}
29
30// Use a more efficient binary serialization format
31pub mod ser {
32    use bincode::{deserialize, serialize, Error};
33    use serde::{Deserialize, Serialize};
34    pub fn to_vec<T: Serialize>(value: &T) -> Result<Vec<u8>, Error> {
35        serialize(value)
36    }
37
38    pub fn from_slice<'de, T: Deserialize<'de>>(bytes: &'de [u8]) -> Result<T, Error> {
39        let val = deserialize(bytes)?;
40        Ok(val)
41    }
42}
43
44/// Wrapper for a process execution that allows awaiting or aborting the process
45#[derive(Debug)]
46pub struct ProcessWrapper<T> {
47    child_pid: Pid,
48    start_time: Option<SystemTime>,
49    receiver: Option<Recver>,
50    result: Arc<Mutex<Option<Vec<u8>>>>,
51    _ghost: std::marker::PhantomData<T>,
52}
53
54impl<T> fmt::Display for ProcessWrapper<T> {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        write!(f, "Process(pid={})", self.child_pid)
57    }
58}
59
60impl<T> ProcessWrapper<T>
61where
62    T: serde::de::DeserializeOwned,
63{
64    /// Create a new ProcessWrapper
65    pub fn new(child_pid: Pid, receiver: Recver) -> Self {
66        Self {
67            child_pid,
68            start_time: None,
69            receiver: Some(receiver),
70            result: Arc::new(Mutex::new(None)),
71            _ghost: std::marker::PhantomData,
72        }
73    }
74
75    /// Wait for the process to complete and return its result
76    pub fn wait(&mut self) -> Result<T, ProcessFunError> {
77        // Ensure we have the start time for process validation
78        self.ensure_start_time()?;
79
80        // Check if we already have a result
81        if let Some(bytes) = self.result.lock().unwrap().take() {
82            return ser::from_slice(&bytes).map_err(ProcessFunError::from);
83        }
84
85        // Read result from pipe
86        let receiver = self.receiver.take().ok_or_else(|| {
87            ProcessFunError::ProcessError("Process already completed".to_string())
88        })?;
89
90        let mut receiver = receiver;
91        let result_bytes = read_from_pipe(&mut receiver)?;
92        let result: T = ser::from_slice(&result_bytes)?;
93
94        Ok(result)
95    }
96
97    /// Wait for the process to complete with a timeout
98    pub fn timeout(&mut self, duration: Duration) -> Result<T, ProcessFunError> {
99        // Ensure we have the start time for process validation
100        self.ensure_start_time()?;
101
102        // Take ownership of the receiver
103        let receiver = self.receiver.take().ok_or_else(|| {
104            ProcessFunError::ProcessError("Process already completed".to_string())
105        })?;
106
107        // Create a channel for the calculation thread to signal completion
108        let (tx, rx) = mpsc::channel();
109
110        // Spawn thread to read from pipe
111        let result = self.result.clone();
112        std::thread::spawn(move || {
113            let mut receiver = receiver;
114            if let Ok(bytes) = read_from_pipe(&mut receiver) {
115                *result.lock().unwrap() = Some(bytes);
116                let _ = tx.send(true); // Signal completion
117            }
118        });
119
120        // Wait for either timeout or completion
121        match rx.recv_timeout(duration) {
122            Ok(_) => {
123                // Process completed within timeout
124                if let Some(bytes) = self.result.lock().unwrap().take() {
125                    return ser::from_slice(&bytes).map_err(ProcessFunError::from);
126                }
127                // This shouldn't happen as we got a completion signal
128                Err(ProcessFunError::ProcessError(
129                    "Process result not found".to_string(),
130                ))
131            }
132            Err(_) => {
133                // Timeout occurred
134                self.abort()?;
135                Err(ProcessFunError::TimeoutError)
136            }
137        }
138    }
139}
140
141#[inline]
142pub fn stat_pid_start(pid: Pid) -> Result<SystemTime, ProcessFunError> {
143    let proc_path = format!("/proc/{}/stat", pid.as_raw());
144    nix::sys::stat::stat(proc_path.as_str())
145        .map_err(|e| ProcessFunError::ProcessError(format!("Failed to stat process: {}", e)))
146        .and_then(|stat| {
147            SystemTime::UNIX_EPOCH
148                .checked_add(Duration::from_secs(stat.st_ctime as u64))
149                .ok_or_else(|| {
150                    ProcessFunError::ProcessError(
151                        "Failed to calculate process start time".to_string(),
152                    )
153                })
154        })
155}
156
157impl<T> ProcessWrapper<T> {
158    /// Lazily read the start time from pipe if not already read
159    #[inline]
160    fn ensure_start_time(&mut self) -> Result<(), ProcessFunError> {
161        if self.start_time.is_some() {
162            return Ok(());
163        }
164
165        if let Some(receiver) = &mut self.receiver {
166            let start_time = read_start_time_from_pipe(receiver)?;
167            self.start_time = Some(start_time);
168            Ok(())
169        } else {
170            Err(ProcessFunError::ProcessError(
171                "Process already completed".to_string(),
172            ))
173        }
174    }
175
176    /// Check if the process is still the same one we created
177    #[inline]
178    fn is_same_process(&mut self) -> bool {
179        if self.ensure_start_time().is_err() {
180            return false;
181        }
182        // Ensure we have the start time for validation
183        if let Some(start_time) = self.start_time {
184            stat_pid_start(self.child_pid)
185                .map(|stat| stat == start_time)
186                .unwrap_or(false)
187        } else {
188            false
189        }
190    }
191
192    #[inline]
193    fn kill(&mut self) -> Result<(), Errno> {
194        // Only kill if it's the same process we created
195        if self.is_same_process() {
196            match signal::kill(self.child_pid, Signal::SIGKILL) {
197                Ok(()) => Ok(()),
198                Err(Errno::ESRCH) => Ok(()), // Process already exited
199                Err(e) => Err(e),
200            }
201        } else {
202            Ok(()) // Different process with same PID, consider it "already killed"
203        }
204    }
205
206    /// Abort the process
207    pub fn abort(&mut self) -> Result<(), ProcessFunError> {
208        // Take ownership of the receiver to ensure it's dropped
209        let _ = self.receiver.take();
210
211        self.kill().map_err(|e| {
212            ProcessFunError::ProcessError(format!("Failed to send SIGKILL to process: {}", e))
213        })?;
214        Ok(())
215    }
216}
217
218impl<T> Drop for ProcessWrapper<T> {
219    fn drop(&mut self) {
220        // Take ownership of the receiver to ensure it's dropped
221        let _ = self.receiver.take();
222
223        // Attempt to kill the process if it's still running
224        let _ = self.kill();
225    }
226}
227
228/// Create a pipe for communication between parent and child processes
229#[inline]
230pub fn create_pipes() -> Result<(Recver, Sender), ProcessFunError> {
231    #[cfg(feature = "debug")]
232    eprintln!("[process-fun-debug] Creating communication pipes");
233
234    // Create pipe with O_CLOEXEC flag
235    let (read_fd, write_fd) = pipe2(OFlag::O_CLOEXEC)
236        .map_err(|e| ProcessFunError::ProcessError(format!("Failed to create pipe: {}", e)))?;
237
238    // Convert raw file descriptors to Sender/Recver
239    let recver = Recver::from(read_fd);
240    let sender = Sender::from(write_fd);
241
242    #[cfg(feature = "debug")]
243    eprintln!("[process-fun-debug] Pipes created successfully");
244
245    Ok((recver, sender))
246}
247
248const SYSTEM_TIME_SIZE: usize = mem::size_of::<SystemTime>();
249
250#[inline]
251fn system_time_to_bytes_unsafe(time: SystemTime) -> [u8; SYSTEM_TIME_SIZE] {
252    unsafe { mem::transmute::<SystemTime, [u8; SYSTEM_TIME_SIZE]>(time) }
253}
254
255#[inline]
256fn bytes_to_system_time_unsafe(bytes: [u8; SYSTEM_TIME_SIZE]) -> SystemTime {
257    unsafe { mem::transmute::<[u8; SYSTEM_TIME_SIZE], SystemTime>(bytes) }
258}
259
260/// Write time to pipe
261#[inline]
262pub fn write_time(fd: &mut Sender, time: SystemTime) -> Result<(), ProcessFunError> {
263    #[cfg(feature = "debug")]
264    eprintln!("[process-fun-debug] Writing start time to pipe");
265
266    let time_bytes = system_time_to_bytes_unsafe(time);
267    fd.write_all(&time_bytes)?;
268
269    #[cfg(feature = "debug")]
270    eprintln!("[process-fun-debug] Successfully wrote start time to pipe");
271
272    Ok(())
273}
274
275/// Write data to a pipe and close it
276#[inline]
277pub fn write_to_pipe(mut fd: Sender, data: &[u8]) -> Result<(), ProcessFunError> {
278    #[cfg(feature = "debug")]
279    eprintln!("[process-fun-debug] Writing {} bytes to pipe", data.len());
280
281    fd.write_all(data)
282        .map_err(|e| ProcessFunError::ProcessError(format!("Failed to write to pipe: {}", e)))?;
283
284    // Let the pipe be automatically flushed and closed when dropped
285    #[cfg(feature = "debug")]
286    eprintln!("[process-fun-debug] Successfully wrote data to pipe");
287
288    Ok(())
289}
290
291/// Read start time from pipe
292#[inline]
293pub fn read_start_time_from_pipe(fd: &mut Recver) -> Result<SystemTime, ProcessFunError> {
294    #[cfg(feature = "debug")]
295    eprintln!("[process-fun-debug] Reading start time from pipe");
296
297    let mut buffer = [0u8; SYSTEM_TIME_SIZE];
298    fd.read_exact(&mut buffer)?;
299    let start_time: SystemTime = bytes_to_system_time_unsafe(buffer);
300
301    #[cfg(feature = "debug")]
302    eprintln!("[process-fun-debug] Read start time from pipe");
303
304    Ok(start_time)
305}
306
307/// Read data from a pipe
308#[inline]
309pub fn read_from_pipe(fd: &mut Recver) -> Result<Vec<u8>, ProcessFunError> {
310    #[cfg(feature = "debug")]
311    eprintln!("[process-fun-debug] Starting to read from pipe");
312
313    let mut buffer = vec![];
314    #[allow(unused_variables)]
315    let bytes_read = fd
316        .read_to_end(&mut buffer)
317        .map_err(|e| ProcessFunError::ProcessError(format!("Failed to read from pipe: {}", e)))?;
318
319    #[cfg(feature = "debug")]
320    eprintln!("[process-fun-debug] Read {} bytes from pipe", bytes_read);
321
322    Ok(buffer)
323}
324
325/// Fork the current process and return ForkResult
326#[inline]
327pub fn fork_process() -> Result<ForkResult, ProcessFunError> {
328    #[cfg(feature = "debug")]
329    eprintln!("[process-fun-debug] Forking process");
330
331    let result = unsafe {
332        fork().map_err(|e| ProcessFunError::ProcessError(format!("Failed to fork process: {}", e)))
333    };
334
335    #[cfg(feature = "debug")]
336    if let Ok(fork_result) = &result {
337        match fork_result {
338            ForkResult::Parent { child } => {
339                eprintln!(
340                    "[process-fun-debug] Fork successful - parent process, child pid: {}",
341                    child
342                );
343            }
344            ForkResult::Child => {
345                eprintln!("[process-fun-debug] Fork successful - child process");
346            }
347        }
348    }
349
350    result
351}
352
353/// Type alias for function identifiers, represented as filesystem paths
354pub type FunId = PathBuf;
355
356/// Errors that can occur during process-fun operations
357#[derive(Error, Debug, Serialize, Deserialize)]
358pub enum ProcessFunError {
359    /// Multiple #[process] attributes were found on a single function.
360    /// Only one #[process] attribute is allowed per function.
361    #[error("Multiple #[process] attributes found for function '{fun}'")]
362    MultipleTags { fun: FunId },
363
364    /// The #[process] attribute was used on an invalid item type.
365    /// It can only be used on function definitions.
366    #[error("Expected #[process] attribute only on function with implementation but found '{item_text}'")]
367    BadItemType { item_text: String },
368
369    /// An I/O error occurred during process execution or file operations
370    #[error("Failed to read or write file: {0}")]
371    IoError(String),
372
373    /// Failed to parse Rust source code
374    #[error("Failed to parse Rust file: {0}")]
375    ParseError(String),
376
377    /// Error during process communication between parent and child processes
378    #[error("Process communication error: {0}")]
379    ProcessError(String),
380
381    /// serialization/deserialization error for function arguments or results
382    #[error("Failed to serialize or deserialize: {0}")]
383    SerError(String),
384
385    /// Process execution timed out
386    #[error("Process execution timed out")]
387    TimeoutError,
388}
389
390impl From<bincode::Error> for ProcessFunError {
391    fn from(err: bincode::Error) -> Self {
392        ProcessFunError::SerError(err.to_string())
393    }
394}
395
396impl From<std::io::Error> for ProcessFunError {
397    fn from(err: std::io::Error) -> Self {
398        ProcessFunError::IoError(err.to_string())
399    }
400}
401
402impl From<syn::Error> for ProcessFunError {
403    fn from(err: syn::Error) -> Self {
404        ProcessFunError::ParseError(err.to_string())
405    }
406}