folk_runtime_fork/
handle.rs1use anyhow::Result;
7use async_trait::async_trait;
8use folk_core::runtime::WorkerHandle;
9use folk_protocol::{FrameCodec, RpcMessage};
10use futures_util::{SinkExt, StreamExt};
11use tokio::net::UnixStream;
12use tokio_util::codec::Framed;
13
14pub struct ForkWorkerHandle {
15 pid: u32,
16 task: Framed<UnixStream, FrameCodec>,
17 control: Framed<UnixStream, FrameCodec>,
18}
19
20impl ForkWorkerHandle {
21 pub fn new(pid: u32, task_stream: UnixStream, control_stream: UnixStream) -> Self {
22 Self {
23 pid,
24 task: Framed::new(task_stream, FrameCodec::new()),
25 control: Framed::new(control_stream, FrameCodec::new()),
26 }
27 }
28}
29
30#[async_trait]
31impl WorkerHandle for ForkWorkerHandle {
32 fn pid(&self) -> u32 {
33 self.pid
34 }
35
36 async fn send_task(&mut self, msg: RpcMessage) -> Result<()> {
37 self.task.send(msg).await?;
38 Ok(())
39 }
40
41 async fn recv_task(&mut self) -> Result<Option<RpcMessage>> {
42 match self.task.next().await {
43 Some(Ok(msg)) => Ok(Some(msg)),
44 Some(Err(e)) => Err(e.into()),
45 None => Ok(None),
46 }
47 }
48
49 async fn send_control(&mut self, msg: RpcMessage) -> Result<()> {
50 self.control.send(msg).await?;
51 Ok(())
52 }
53
54 async fn recv_control(&mut self) -> Result<Option<RpcMessage>> {
55 match self.control.next().await {
56 Some(Ok(msg)) => Ok(Some(msg)),
57 Some(Err(e)) => Err(e.into()),
58 None => Ok(None),
59 }
60 }
61
62 #[allow(unsafe_code, clippy::cast_possible_wrap)]
63 async fn terminate(&mut self) -> Result<()> {
64 unsafe {
66 libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
67 }
68
69 let grace = tokio::time::Duration::from_secs(5);
71 tokio::time::sleep(grace).await;
72
73 let alive = unsafe { libc::kill(self.pid as libc::pid_t, 0) } == 0;
75 if alive {
76 tracing::warn!(
77 pid = self.pid,
78 "worker did not exit after SIGTERM; sending SIGKILL"
79 );
80 unsafe {
81 libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
82 }
83 }
84
85 Ok(())
86 }
87}