Skip to main content

folk_core/
runtime.rs

1//! Abstraction over how PHP workers are spawned.
2//!
3//! `WorkerPool` does not know what a PHP process is. It asks the
4//! `Runtime` to spawn a worker; the runtime returns a `WorkerHandle` which
5//! gives the pool ways to send/receive frames and to terminate the worker.
6//!
7//! `folk-runtime-pipe` (phase 4) implements this trait by spawning real PHP
8//! processes via execve. `folk-runtime-fork` (phase 10) implements it via
9//! prefork. Tests use `MockRuntime` below.
10
11use std::collections::VecDeque;
12use std::sync::Mutex;
13
14use anyhow::Result;
15use async_trait::async_trait;
16use folk_protocol::RpcMessage;
17
18/// A handle to a spawned worker.
19///
20/// The pool sends task-channel frames via `send_task` and receives them via
21/// `recv_task`. Control-channel frames travel through `send_control` and
22/// `recv_control`. Termination is via `terminate`.
23#[async_trait]
24pub trait WorkerHandle: Send + 'static {
25    /// Worker process ID (informational; used for logging).
26    fn pid(&self) -> u32;
27
28    /// Send a frame on the task channel.
29    async fn send_task(&mut self, msg: RpcMessage) -> Result<()>;
30
31    /// Receive a frame from the task channel. Returns `None` on EOF.
32    async fn recv_task(&mut self) -> Result<Option<RpcMessage>>;
33
34    /// Send a frame on the control channel.
35    async fn send_control(&mut self, msg: RpcMessage) -> Result<()>;
36
37    /// Receive a frame from the control channel. Returns `None` on EOF.
38    async fn recv_control(&mut self) -> Result<Option<RpcMessage>>;
39
40    /// Request the worker to terminate. Implementations send SIGTERM, then
41    /// after a grace period SIGKILL, then `waitpid`.
42    async fn terminate(&mut self) -> Result<()>;
43}
44
45/// Spawns workers per a runtime-specific strategy.
46#[async_trait]
47pub trait Runtime: Send + Sync + 'static {
48    /// Spawn a single worker and return a handle.
49    ///
50    /// The handle's task and control sockets must be ready: a subsequent
51    /// `recv_control` is expected to yield `control.ready` within the boot
52    /// timeout (enforced by the pool, not the runtime).
53    async fn spawn(&self) -> Result<Box<dyn WorkerHandle>>;
54}
55
56// --- MockRuntime: in-memory runtime for tests ---
57
58/// In-memory runtime used in tests. Each spawned worker is a `MockWorker`
59/// that returns canned responses for any task-channel request.
60pub struct MockRuntime {
61    responder: std::sync::Arc<dyn Fn(&RpcMessage) -> RpcMessage + Send + Sync>,
62    next_pid: std::sync::atomic::AtomicU32,
63}
64
65impl MockRuntime {
66    /// Create a mock runtime that echoes every request as a successful response.
67    pub fn echo() -> Self {
68        Self {
69            responder: std::sync::Arc::new(|msg| match msg {
70                RpcMessage::Request { msgid, params, .. } => {
71                    RpcMessage::response_ok(*msgid, params.clone())
72                },
73                _ => RpcMessage::notify("mock.unsupported", rmpv::Value::Nil),
74            }),
75            next_pid: std::sync::atomic::AtomicU32::new(10000),
76        }
77    }
78}
79
80#[async_trait]
81impl Runtime for MockRuntime {
82    async fn spawn(&self) -> Result<Box<dyn WorkerHandle>> {
83        let pid = self
84            .next_pid
85            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
86        Ok(Box::new(MockWorker {
87            pid,
88            responder: self.responder.clone(),
89            inbound_task: Mutex::new(VecDeque::new()),
90            inbound_control: Mutex::new({
91                let mut q = VecDeque::new();
92                // Pre-load the Ready handshake.
93                q.push_back(RpcMessage::notify(
94                    "control.ready",
95                    rmpv::Value::Map(vec![(
96                        rmpv::Value::String("pid".into()),
97                        rmpv::Value::Integer(pid.into()),
98                    )]),
99                ));
100                q
101            }),
102            terminated: false,
103        }))
104    }
105}
106
107/// In-memory worker used by `MockRuntime`.
108pub struct MockWorker {
109    pid: u32,
110    responder: std::sync::Arc<dyn Fn(&RpcMessage) -> RpcMessage + Send + Sync>,
111    inbound_task: Mutex<VecDeque<RpcMessage>>,
112    inbound_control: Mutex<VecDeque<RpcMessage>>,
113    terminated: bool,
114}
115
116#[async_trait]
117impl WorkerHandle for MockWorker {
118    fn pid(&self) -> u32 {
119        self.pid
120    }
121
122    async fn send_task(&mut self, msg: RpcMessage) -> Result<()> {
123        if self.terminated {
124            anyhow::bail!("worker terminated");
125        }
126        let response = (self.responder)(&msg);
127        self.inbound_task.lock().unwrap().push_back(response);
128        Ok(())
129    }
130
131    async fn recv_task(&mut self) -> Result<Option<RpcMessage>> {
132        Ok(self.inbound_task.lock().unwrap().pop_front())
133    }
134
135    async fn send_control(&mut self, _msg: RpcMessage) -> Result<()> {
136        Ok(())
137    }
138
139    async fn recv_control(&mut self) -> Result<Option<RpcMessage>> {
140        Ok(self.inbound_control.lock().unwrap().pop_front())
141    }
142
143    async fn terminate(&mut self) -> Result<()> {
144        self.terminated = true;
145        Ok(())
146    }
147}