Skip to main content

folk_core/
runtime.rs

1//! Abstraction over how PHP workers are spawned.
2//!
3//! `WorkerPool` does not know what a PHP process is. It asks the
4//! `Runtime` to spawn a worker; the runtime returns a `WorkerHandle` which
5//! gives the pool a way to execute requests and to terminate the worker.
6//!
7//! In phase 23 (extension mode), the runtime spawns OS threads that run PHP
8//! inside the same process. Communication is via channels (zero IPC).
9
10use std::sync::Arc;
11
12use anyhow::Result;
13use async_trait::async_trait;
14
15/// A handle to a spawned worker.
16///
17/// The pool dispatches requests via `execute` and controls lifecycle via
18/// `ready` and `terminate`.
19#[async_trait]
20pub trait WorkerHandle: Send + 'static {
21    /// Worker identifier (thread ID, PID, or synthetic).
22    fn id(&self) -> u32;
23
24    /// Wait for the worker to signal readiness.
25    /// Returns once the worker has booted and is ready to accept requests.
26    async fn ready(&mut self) -> Result<()>;
27
28    /// Execute a single request: send structured data, receive result.
29    ///
30    /// `request_id` is a globally-unique id (UUID v7) for this request, exposed
31    /// to PHP via `folk_request_id()` for log correlation.
32    async fn execute(
33        &mut self,
34        method: &str,
35        payload: serde_json::Value,
36        request_id: Arc<str>,
37    ) -> Result<serde_json::Value>;
38
39    /// Terminate the worker. Implementations should signal shutdown and
40    /// wait for the worker to exit.
41    async fn terminate(&mut self) -> Result<()>;
42
43    /// Whether this worker can be recycled (terminated and respawned).
44    /// Returns `false` for the main thread worker which cannot be restarted.
45    fn is_recyclable(&self) -> bool {
46        true
47    }
48}
49
50/// Spawns workers per a runtime-specific strategy.
51#[async_trait]
52pub trait Runtime: Send + Sync + 'static {
53    /// Spawn a single worker and return a handle.
54    ///
55    /// The caller must call `ready()` before dispatching requests.
56    async fn spawn(&self) -> Result<Box<dyn WorkerHandle>>;
57
58    /// Warm up shared caches (OPcache) before spawning workers.
59    /// Called once at startup. Default: no-op.
60    async fn warmup(&self) -> Result<()> {
61        Ok(())
62    }
63
64    /// Invalidate compiled-code caches so respawned workers pick up changes.
65    ///
66    /// Called by the dev-mode file watcher just before workers are recycled.
67    /// In ZTS the OPcache is shared process-wide, so a single reset affects
68    /// every worker thread. Default: no-op.
69    async fn reload(&self) -> Result<()> {
70        Ok(())
71    }
72}
73
74// --- MockRuntime: in-memory runtime for tests ---
75
76type MockResponder =
77    std::sync::Arc<dyn Fn(&str, &serde_json::Value) -> Result<serde_json::Value> + Send + Sync>;
78
79/// In-memory runtime used in tests. Each spawned worker echoes requests back.
80pub struct MockRuntime {
81    responder: MockResponder,
82    next_id: std::sync::atomic::AtomicU32,
83    /// Request ids observed by all workers, in dispatch order. For test assertions.
84    seen_request_ids: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
85    /// Spawn indices (0-based) whose workers should panic on the first execute call.
86    panic_slots: std::collections::HashSet<u32>,
87}
88
89impl MockRuntime {
90    /// Create a mock runtime that echoes the payload back as the result.
91    pub fn echo() -> Self {
92        Self {
93            responder: std::sync::Arc::new(|_method, payload| Ok(payload.clone())),
94            next_id: std::sync::atomic::AtomicU32::new(10000),
95            seen_request_ids: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
96            panic_slots: std::collections::HashSet::default(),
97        }
98    }
99
100    /// Create a runtime where workers at the given spawn indices (0-based) panic
101    /// inside `execute`. The panic propagates through the supervisor task, dropping
102    /// the slot's inbox receiver — used to test dead-slot detection in the pool.
103    pub fn with_panicking_slots(indices: &[u32]) -> Self {
104        Self {
105            responder: std::sync::Arc::new(|_method, payload| Ok(payload.clone())),
106            next_id: std::sync::atomic::AtomicU32::new(10000),
107            seen_request_ids: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
108            panic_slots: indices.iter().copied().collect(),
109        }
110    }
111
112    /// Number of workers spawned so far. Useful for asserting recycling.
113    pub fn spawn_count(&self) -> u32 {
114        self.next_id.load(std::sync::atomic::Ordering::Relaxed) - 10000
115    }
116
117    /// Request ids passed to `execute`, in the order they were dispatched.
118    pub fn seen_request_ids(&self) -> Vec<String> {
119        self.seen_request_ids.lock().unwrap().clone()
120    }
121}
122
123#[async_trait]
124impl Runtime for MockRuntime {
125    async fn spawn(&self) -> Result<Box<dyn WorkerHandle>> {
126        let id = self
127            .next_id
128            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
129        let spawn_index = id - 10000;
130        let should_panic = self.panic_slots.contains(&spawn_index);
131        Ok(Box::new(MockWorker {
132            id,
133            responder: self.responder.clone(),
134            seen_request_ids: self.seen_request_ids.clone(),
135            terminated: false,
136            should_panic,
137        }))
138    }
139}
140
141/// In-memory worker used by `MockRuntime`.
142pub struct MockWorker {
143    id: u32,
144    responder: MockResponder,
145    seen_request_ids: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
146    terminated: bool,
147    /// If true, `execute` panics — used to simulate a supervisor task crash.
148    should_panic: bool,
149}
150
151#[async_trait]
152impl WorkerHandle for MockWorker {
153    fn id(&self) -> u32 {
154        self.id
155    }
156
157    async fn ready(&mut self) -> Result<()> {
158        Ok(())
159    }
160
161    async fn execute(
162        &mut self,
163        method: &str,
164        payload: serde_json::Value,
165        request_id: Arc<str>,
166    ) -> Result<serde_json::Value> {
167        assert!(!self.should_panic, "simulated slot failure");
168        if self.terminated {
169            anyhow::bail!("worker terminated");
170        }
171        self.seen_request_ids
172            .lock()
173            .unwrap()
174            .push(request_id.to_string());
175        (self.responder)(method, &payload)
176    }
177
178    async fn terminate(&mut self) -> Result<()> {
179        self.terminated = true;
180        Ok(())
181    }
182}