folk_runtime_pipe/
handle.rs1use anyhow::Result;
4use async_trait::async_trait;
5use folk_core::runtime::WorkerHandle;
6use folk_protocol::{FrameCodec, RpcMessage};
7use futures_util::{SinkExt, StreamExt};
8use tokio::net::UnixStream;
9use tokio::process::Child;
10use tokio_util::codec::Framed;
11
12pub struct PipeWorkerHandle {
13 child: Child,
14 task: Framed<UnixStream, FrameCodec>,
15 control: Framed<UnixStream, FrameCodec>,
16}
17
18impl PipeWorkerHandle {
19 pub fn new(child: Child, task_stream: UnixStream, control_stream: UnixStream) -> Self {
20 Self {
21 child,
22 task: Framed::new(task_stream, FrameCodec::new()),
23 control: Framed::new(control_stream, FrameCodec::new()),
24 }
25 }
26}
27
28#[async_trait]
29impl WorkerHandle for PipeWorkerHandle {
30 fn pid(&self) -> u32 {
31 self.child.id().unwrap_or(0)
32 }
33
34 async fn send_task(&mut self, msg: RpcMessage) -> Result<()> {
35 self.task.send(msg).await?;
36 Ok(())
37 }
38
39 async fn recv_task(&mut self) -> Result<Option<RpcMessage>> {
40 match self.task.next().await {
41 Some(Ok(msg)) => Ok(Some(msg)),
42 Some(Err(e)) => Err(e.into()),
43 None => Ok(None),
44 }
45 }
46
47 async fn send_control(&mut self, msg: RpcMessage) -> Result<()> {
48 self.control.send(msg).await?;
49 Ok(())
50 }
51
52 async fn recv_control(&mut self) -> Result<Option<RpcMessage>> {
53 match self.control.next().await {
54 Some(Ok(msg)) => Ok(Some(msg)),
55 Some(Err(e)) => Err(e.into()),
56 None => Ok(None),
57 }
58 }
59
60 #[allow(unsafe_code)]
61 async fn terminate(&mut self) -> Result<()> {
62 let pid = self.child.id().unwrap_or(0);
63
64 #[allow(clippy::cast_possible_wrap)]
67 unsafe {
68 libc::kill(pid as libc::pid_t, libc::SIGTERM);
69 }
70
71 let grace = tokio::time::Duration::from_secs(5);
73 let exited = tokio::time::timeout(grace, self.child.wait()).await;
74
75 if let Ok(Ok(_)) = exited {
76 tracing::debug!(pid, "worker exited cleanly after SIGTERM");
77 } else {
78 tracing::warn!(pid, "worker did not exit after SIGTERM; sending SIGKILL");
79 #[allow(clippy::cast_possible_wrap)]
81 unsafe {
82 libc::kill(pid as libc::pid_t, libc::SIGKILL);
83 }
84 let _ = self.child.wait().await;
85 }
86
87 Ok(())
88 }
89}