Skip to main content

folk_runtime_fork/
master.rs

1//! Prefork master management: spawn the master PHP process and issue fork commands.
2
3use std::os::unix::io::{AsRawFd, OwnedFd};
4
5use anyhow::{Context, Result, bail};
6use folk_protocol::{FrameCodec, RpcMessage};
7use futures_util::{SinkExt, StreamExt};
8use rmpv::Value;
9use tokio::net::UnixStream;
10use tokio_util::codec::Framed;
11use tracing::{debug, info};
12
13use crate::scm_rights::send_fds;
14
15/// Manages the prefork master PHP process.
16pub struct PreforkMaster {
17    control: Framed<UnixStream, FrameCodec>,
18    pid: u32,
19    /// Socket for sending FDs to the master via `SCM_RIGHTS`.
20    fork_socket: UnixStream,
21}
22
23impl PreforkMaster {
24    /// Spawn the PHP prefork master (`FOLK_RUNTIME=fork-master`).
25    /// Waits for `control.fork-ready` within `boot_timeout`.
26    pub async fn spawn(php: &str, script: &str, boot_timeout: std::time::Duration) -> Result<Self> {
27        // Re-use PipeRuntime's spawn_worker for the initial master process.
28        let spawned =
29            folk_runtime_pipe::spawn::spawn_worker(php, script).context("spawn prefork master")?;
30        let mut control = Framed::new(spawned.control_master, FrameCodec::new());
31        let pid = spawned.child.id().unwrap_or(0);
32
33        // Wait for fork-ready
34        let ready = tokio::time::timeout(boot_timeout, control.next())
35            .await
36            .context("boot timeout")?
37            .context("EOF before fork-ready")?
38            .context("decode fork-ready")?;
39
40        match ready {
41            RpcMessage::Notify { ref method, .. } if method == "control.fork-ready" => {
42                info!(pid, "prefork master ready");
43            },
44            other => bail!("expected control.fork-ready, got {other:?}"),
45        }
46
47        Ok(Self {
48            control,
49            pid,
50            fork_socket: spawned.task_master,
51        })
52    }
53
54    /// Send the master a fork command + two socket FDs (task + control for the new child).
55    /// Returns the child's PID from the master's reply.
56    #[allow(unsafe_code)]
57    pub async fn fork_worker(&mut self, task_child: &OwnedFd, ctrl_child: &OwnedFd) -> Result<u32> {
58        // Send FDs via SCM_RIGHTS over fork_socket
59        unsafe {
60            send_fds(
61                self.fork_socket.as_raw_fd(),
62                &[task_child.as_raw_fd(), ctrl_child.as_raw_fd()],
63            )?;
64        }
65
66        // Send fork command on control channel
67        self.control
68            .send(RpcMessage::notify("fork.spawn", Value::Nil))
69            .await?;
70
71        // Receive child PID from master
72        let reply = tokio::time::timeout(std::time::Duration::from_secs(10), self.control.next())
73            .await
74            .context("fork reply timeout")?
75            .context("EOF from master")?
76            .context("decode fork reply")?;
77
78        match reply {
79            RpcMessage::Notify { method, params } if method == "fork.spawned" => {
80                #[allow(clippy::cast_possible_truncation)]
81                let pid = params
82                    .as_map()
83                    .and_then(|m| m.iter().find(|(k, _)| k.as_str() == Some("pid")))
84                    .and_then(|(_, v)| v.as_u64())
85                    .context("missing pid in fork.spawned")? as u32;
86                debug!(child_pid = pid, "child forked");
87                Ok(pid)
88            },
89            other => bail!("unexpected reply to fork.spawn: {other:?}"),
90        }
91    }
92
93    /// Shut down the prefork master.
94    #[allow(unsafe_code, clippy::cast_possible_wrap)]
95    pub async fn shutdown(&mut self) -> Result<()> {
96        let _ = self
97            .control
98            .send(RpcMessage::notify("control.shutdown", Value::Nil))
99            .await;
100        unsafe {
101            libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
102        }
103        Ok(())
104    }
105}