Skip to main content

folk_runtime_fork/
handle.rs

1//! `ForkWorkerHandle`: wraps two Framed Unix streams for a forked child.
2//!
3//! Similar to `PipeWorkerHandle` but the child is identified by PID (not
4//! a `tokio::process::Child`), since the PHP master owns the forked process.
5
6use anyhow::Result;
7use async_trait::async_trait;
8use folk_core::runtime::WorkerHandle;
9use folk_protocol::{FrameCodec, RpcMessage};
10use futures_util::{SinkExt, StreamExt};
11use tokio::net::UnixStream;
12use tokio_util::codec::Framed;
13
14pub struct ForkWorkerHandle {
15    pid: u32,
16    task: Framed<UnixStream, FrameCodec>,
17    control: Framed<UnixStream, FrameCodec>,
18}
19
20impl ForkWorkerHandle {
21    pub fn new(pid: u32, task_stream: UnixStream, control_stream: UnixStream) -> Self {
22        Self {
23            pid,
24            task: Framed::new(task_stream, FrameCodec::new()),
25            control: Framed::new(control_stream, FrameCodec::new()),
26        }
27    }
28}
29
30#[async_trait]
31impl WorkerHandle for ForkWorkerHandle {
32    fn pid(&self) -> u32 {
33        self.pid
34    }
35
36    async fn send_task(&mut self, msg: RpcMessage) -> Result<()> {
37        self.task.send(msg).await?;
38        Ok(())
39    }
40
41    async fn recv_task(&mut self) -> Result<Option<RpcMessage>> {
42        match self.task.next().await {
43            Some(Ok(msg)) => Ok(Some(msg)),
44            Some(Err(e)) => Err(e.into()),
45            None => Ok(None),
46        }
47    }
48
49    async fn send_control(&mut self, msg: RpcMessage) -> Result<()> {
50        self.control.send(msg).await?;
51        Ok(())
52    }
53
54    async fn recv_control(&mut self) -> Result<Option<RpcMessage>> {
55        match self.control.next().await {
56            Some(Ok(msg)) => Ok(Some(msg)),
57            Some(Err(e)) => Err(e.into()),
58            None => Ok(None),
59        }
60    }
61
62    #[allow(unsafe_code, clippy::cast_possible_wrap)]
63    async fn terminate(&mut self) -> Result<()> {
64        // Send SIGTERM
65        unsafe {
66            libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
67        }
68
69        // Wait 5 seconds, then SIGKILL
70        let grace = tokio::time::Duration::from_secs(5);
71        tokio::time::sleep(grace).await;
72
73        // Check if still alive, send SIGKILL
74        let alive = unsafe { libc::kill(self.pid as libc::pid_t, 0) } == 0;
75        if alive {
76            tracing::warn!(
77                pid = self.pid,
78                "worker did not exit after SIGTERM; sending SIGKILL"
79            );
80            unsafe {
81                libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
82            }
83        }
84
85        Ok(())
86    }
87}