Skip to main content

folk_runtime_pipe/
handle.rs

1//! `PipeWorkerHandle`: wraps two Framed Unix streams for task+control.
2
3use anyhow::Result;
4use async_trait::async_trait;
5use folk_core::runtime::WorkerHandle;
6use folk_protocol::{FrameCodec, RpcMessage};
7use futures_util::{SinkExt, StreamExt};
8use tokio::net::UnixStream;
9use tokio::process::Child;
10use tokio_util::codec::Framed;
11
12pub struct PipeWorkerHandle {
13    child: Child,
14    task: Framed<UnixStream, FrameCodec>,
15    control: Framed<UnixStream, FrameCodec>,
16}
17
18impl PipeWorkerHandle {
19    pub fn new(child: Child, task_stream: UnixStream, control_stream: UnixStream) -> Self {
20        Self {
21            child,
22            task: Framed::new(task_stream, FrameCodec::new()),
23            control: Framed::new(control_stream, FrameCodec::new()),
24        }
25    }
26}
27
28#[async_trait]
29impl WorkerHandle for PipeWorkerHandle {
30    fn pid(&self) -> u32 {
31        self.child.id().unwrap_or(0)
32    }
33
34    async fn send_task(&mut self, msg: RpcMessage) -> Result<()> {
35        self.task.send(msg).await?;
36        Ok(())
37    }
38
39    async fn recv_task(&mut self) -> Result<Option<RpcMessage>> {
40        match self.task.next().await {
41            Some(Ok(msg)) => Ok(Some(msg)),
42            Some(Err(e)) => Err(e.into()),
43            None => Ok(None),
44        }
45    }
46
47    async fn send_control(&mut self, msg: RpcMessage) -> Result<()> {
48        self.control.send(msg).await?;
49        Ok(())
50    }
51
52    async fn recv_control(&mut self) -> Result<Option<RpcMessage>> {
53        match self.control.next().await {
54            Some(Ok(msg)) => Ok(Some(msg)),
55            Some(Err(e)) => Err(e.into()),
56            None => Ok(None),
57        }
58    }
59
60    #[allow(unsafe_code)]
61    async fn terminate(&mut self) -> Result<()> {
62        let pid = self.child.id().unwrap_or(0);
63
64        // Send SIGTERM (tokio's start_kill sends SIGKILL, not SIGTERM)
65        // SAFETY: kill is a well-defined syscall with a valid pid.
66        #[allow(clippy::cast_possible_wrap)]
67        unsafe {
68            libc::kill(pid as libc::pid_t, libc::SIGTERM);
69        }
70
71        // Wait up to 5 seconds for clean exit
72        let grace = tokio::time::Duration::from_secs(5);
73        let exited = tokio::time::timeout(grace, self.child.wait()).await;
74
75        if let Ok(Ok(_)) = exited {
76            tracing::debug!(pid, "worker exited cleanly after SIGTERM");
77        } else {
78            tracing::warn!(pid, "worker did not exit after SIGTERM; sending SIGKILL");
79            // SAFETY: kill with SIGKILL on a valid pid.
80            #[allow(clippy::cast_possible_wrap)]
81            unsafe {
82                libc::kill(pid as libc::pid_t, libc::SIGKILL);
83            }
84            let _ = self.child.wait().await;
85        }
86
87        Ok(())
88    }
89}