prefork 0.6.0

A library for forking processes
Documentation
#[cfg(feature = "tokio")]
use std::future::Future;

use crate::{
    error::Result,
    server::{run_parent, Server},
};
#[cfg(feature = "tokio")]
use tokio::runtime::Builder;

/// The default number of processes.
pub const DEFAULT_NUM_PROCESSES: u32 = 8;

/// Build a server that starts by forking processes.
///
/// # Example
/// ```
/// use std::net::TcpListener;
/// use prefork::Prefork;
///
/// async fn child(child_num: u32, listener: TcpListener) {
///     println!("Started child {child_num} with Tokio runtime.")
/// }
///
/// fn main() {
///     let listener = TcpListener::bind("0.0.0.0:3000").expect("cannot bind to port");
///     let is_parent = Prefork::from_resource(listener)
/// #       .with_num_processes(0)
///         .with_tokio(child)
///         .fork()
///         .expect("cannot fork");
///     if is_parent {
///         println!("parent finished");
///     }
/// }
/// ```
pub struct Prefork<Res>
where
    Res: 'static,
{
    resource: Res,
    num_processes: u32,
    child_init: Box<dyn Fn(u32, Res)>,
}

impl<Res> Prefork<Res>
where
    Res: 'static,
{
    /// Start building a server that will pass `resource` to each child process.
    /// # Examples
    /// Pass a UDP socket:
    /// ```
    /// # use std::net::UdpSocket;
    /// # use prefork::Prefork;
    ///
    /// let udp = UdpSocket::bind("127.0.0.1:34252").expect("cannot bind UDP socket");
    /// Prefork::from_resource(udp)
    /// #       .with_num_processes(0)
    ///         .fork();
    /// ```
    /// Pass any type:
    /// ```
    /// # use prefork::Prefork;
    ///
    /// fn show_message(child_num: u32, message: &str) {
    ///     println!("Child {child_num} says {message}.")
    /// }
    ///
    /// fn main() {
    ///     let message = "Hello World";
    ///     Prefork::from_resource(message)
    /// #           .with_num_processes(0)
    ///             .with_init(show_message)
    ///             .fork()
    ///             .expect("successful fork");
    /// }
    /// ```
    pub fn from_resource(resource: Res) -> Self {
        Self {
            resource,
            num_processes: DEFAULT_NUM_PROCESSES,
            child_init: Box::new(Self::empty_init),
        }
    }

    /// Set the number of child processes.
    /// If not set, [`DEFAULT_NUM_PROCESSES`] is used.
    /// # Example
    /// ```no_run
    /// # use prefork::Prefork;
    /// let message = "Hello World";
    /// Prefork::from_resource(message)
    ///         .with_num_processes(16)
    ///         .fork()
    ///         .expect("successful fork");
    /// ```
    #[must_use]
    pub fn with_num_processes(mut self, num_processes: u32) -> Self {
        self.num_processes = num_processes;
        self
    }

    /// Start a tokio runtime in each child process and initialise the child
    /// processes with an async function.
    /// # Example
    /// ```
    /// # use prefork::Prefork;
    ///
    /// async fn show_message(child_num: u32, message: &str) {
    ///     println!("Child {child_num} says {message}.");
    /// }
    ///
    /// fn main() {
    ///     let message = "Hello World";
    ///     Prefork::from_resource(message)
    /// #           .with_num_processes(0)
    ///             .with_tokio(show_message)
    ///             .fork()
    ///             .expect("successful fork");
    /// }
    /// ```
    #[cfg(feature = "tokio")]
    #[must_use]
    pub fn with_tokio<FAsync, Fut>(mut self, child_init: FAsync) -> Self
    where
        FAsync: Fn(u32, Res) -> Fut + 'static,
        Fut: Future<Output = ()>,
    {
        self.child_init = Self::start_tokio(child_init);
        self
    }

    /// Start each child process without an async runtime.
    ///
    /// If this function is used, consider turning off the default features.
    /// # Example
    /// ```
    /// # use prefork::Prefork;
    ///
    /// fn show_message(child_num: u32, message: &str) {
    ///     println!("Child {child_num} says {message}.")
    /// }
    ///
    /// fn main() {
    ///     let message = "Hello World";
    ///     Prefork::from_resource(message)
    /// #           .with_num_processes(0)
    ///             .with_init(show_message)
    ///             .fork()
    ///             .expect("successful fork");
    /// }
    /// ```
    #[must_use]
    pub fn with_init<FChild>(mut self, child_init: FChild) -> Self
    where
        FChild: Fn(u32, Res) + 'static,
    {
        self.child_init = Box::new(child_init);
        self
    }

    /// Fork the children.
    /// Returns true if this process is the parent.
    ///
    /// The parent process will not return until all the children are dead. Basic signal
    /// management is done by this function and `SIGINT`, `SIGTERM`, `SIGQUIT` will
    /// cause children to be killed.
    ///
    /// If more complex behaviour is needed, consider calling `server()` instead.
    ///
    /// # Errors
    /// Errors returned by `fork()` are propagated.
    ///
    /// # Example
    /// ```
    /// # use prefork::Prefork;
    ///
    /// async fn show_message(child_num: u32, message: &str) {
    ///     println!("Child {child_num} says {message}.");
    /// }
    ///
    /// fn main() {
    ///     let message = "Hello World";
    ///     let is_parent = Prefork::from_resource(message)
    /// #                           .with_num_processes(0)
    ///                             .with_tokio(show_message)
    ///                             .fork()
    ///                             .expect("successful fork");
    ///     if is_parent {
    ///        println!("Parent exit");
    ///     }
    /// }
    /// ```
    pub fn fork(self) -> Result<bool> {
        let mut server = Server::from_resource(self.resource, self.child_init, self.num_processes);
        let Some(pids) = server.prefork()? else {
            // We're a child
            return Ok(false);
        };
        if !pids.is_empty() {
            run_parent(pids)?;
        }
        Ok(true)
    }

    /// Create a prefork server that can be used to manage child processes.
    /// This is a lower level alternative to the `fork()` function. The caller is
    /// responsible for forking processes and managing child processes.
    ///
    /// For a complete example see `examples/unmanaged.rs`.
    ///
    /// # Example
    /// ```
    /// # use prefork::Prefork;
    ///
    /// async fn show_message(child_num: u32, message: &str) {
    ///     println!("Child {child_num} says {message}.");
    /// }
    ///
    /// fn main() {
    ///     let message = "Hello World";
    ///     let mut server = Prefork::from_resource(message)
    /// #                        .with_num_processes(0)
    ///                          .with_tokio(show_message)
    ///                          .server();
    ///     if let Some(pids) = server.prefork().expect("successful fork") {
    ///        println!("TODO: manage the subprocesses given by pids");
    ///     }
    /// }
    /// ```
    pub fn server(self) -> Server<Res> {
        Server::from_resource(self.resource, self.child_init, self.num_processes)
    }

    #[cfg(feature = "tokio")]
    fn start_tokio<FAsync, Fut>(child_init: FAsync) -> Box<dyn Fn(u32, Res)>
    where
        FAsync: Fn(u32, Res) -> Fut + 'static,
        Fut: Future<Output = ()>,
    {
        Box::new(move |child_num: u32, resource: Res| {
            let runtime = Builder::new_current_thread()
                .enable_io()
                .enable_time()
                .build()
                .expect("cannot create runtime");
            runtime.block_on(async {
                (child_init)(child_num, resource).await;
            });
        })
    }

    #[allow(clippy::needless_pass_by_value)]
    fn empty_init(_child_num: u32, _res: Res) {
        // Do nothing
    }
}