folk_runtime_fork/
master.rs1use std::os::unix::io::{AsRawFd, OwnedFd};
4
5use anyhow::{Context, Result, bail};
6use folk_protocol::{FrameCodec, RpcMessage};
7use futures_util::{SinkExt, StreamExt};
8use rmpv::Value;
9use tokio::net::UnixStream;
10use tokio_util::codec::Framed;
11use tracing::{debug, info};
12
13use crate::scm_rights::send_fds;
14
15pub struct PreforkMaster {
17 control: Framed<UnixStream, FrameCodec>,
18 pid: u32,
19 fork_socket: UnixStream,
21 _child: tokio::process::Child,
23}
24
25impl PreforkMaster {
26 pub async fn spawn(php: &str, script: &str, boot_timeout: std::time::Duration) -> Result<Self> {
29 let spawned = folk_runtime_pipe::spawn::spawn_worker_with_runtime(php, script, "fork")
32 .context("spawn prefork master")?;
33 let mut control = Framed::new(spawned.control_master, FrameCodec::new());
34 let pid = spawned.child.id().unwrap_or(0);
35
36 let ready = tokio::time::timeout(boot_timeout, control.next())
38 .await
39 .context("boot timeout")?
40 .context("EOF before fork-ready")?
41 .context("decode fork-ready")?;
42
43 match ready {
44 RpcMessage::Notify { ref method, .. } if method == "control.fork-ready" => {
45 info!(pid, "prefork master ready");
46 },
47 other => bail!("expected control.fork-ready, got {other:?}"),
48 }
49
50 Ok(Self {
51 control,
52 pid,
53 fork_socket: spawned.task_master,
54 _child: spawned.child,
55 })
56 }
57
58 #[allow(unsafe_code)]
61 pub async fn fork_worker(&mut self, task_child: &OwnedFd, ctrl_child: &OwnedFd) -> Result<u32> {
62 unsafe {
64 send_fds(
65 self.fork_socket.as_raw_fd(),
66 &[task_child.as_raw_fd(), ctrl_child.as_raw_fd()],
67 )?;
68 }
69
70 self.control
72 .send(RpcMessage::notify("fork.spawn", Value::Nil))
73 .await?;
74
75 let reply = tokio::time::timeout(std::time::Duration::from_secs(10), self.control.next())
77 .await
78 .context("fork reply timeout")?
79 .context("EOF from master")?
80 .context("decode fork reply")?;
81
82 match reply {
83 RpcMessage::Notify { method, params } if method == "fork.spawned" => {
84 #[allow(clippy::cast_possible_truncation)]
85 let pid = params
86 .as_map()
87 .and_then(|m| m.iter().find(|(k, _)| k.as_str() == Some("pid")))
88 .and_then(|(_, v)| v.as_u64())
89 .context("missing pid in fork.spawned")? as u32;
90 debug!(child_pid = pid, "child forked");
91 Ok(pid)
92 },
93 other => bail!("unexpected reply to fork.spawn: {other:?}"),
94 }
95 }
96
97 #[allow(unsafe_code, clippy::cast_possible_wrap)]
99 pub async fn shutdown(&mut self) -> Result<()> {
100 let _ = self
101 .control
102 .send(RpcMessage::notify("control.shutdown", Value::Nil))
103 .await;
104 unsafe {
105 libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
106 }
107 Ok(())
108 }
109}