Skip to main content

folk_core/
runtime.rs

1//! Abstraction over how PHP workers are spawned.
2//!
3//! `WorkerPool` does not know what a PHP process is. It asks the
4//! `Runtime` to spawn a worker; the runtime returns a `WorkerHandle` which
5//! gives the pool a way to execute requests and to terminate the worker.
6//!
7//! In phase 23 (extension mode), the runtime spawns OS threads that run PHP
8//! inside the same process. Communication is via channels (zero IPC).
9
10use std::sync::Arc;
11
12use anyhow::Result;
13use async_trait::async_trait;
14
15/// A handle to a spawned worker.
16///
17/// The pool dispatches requests via `execute` and controls lifecycle via
18/// `ready` and `terminate`.
19#[async_trait]
20pub trait WorkerHandle: Send + 'static {
21    /// Worker identifier (thread ID, PID, or synthetic).
22    fn id(&self) -> u32;
23
24    /// Wait for the worker to signal readiness.
25    /// Returns once the worker has booted and is ready to accept requests.
26    async fn ready(&mut self) -> Result<()>;
27
28    /// Execute a single request: send structured data, receive result.
29    ///
30    /// `request_id` is a globally-unique id (UUID v7) for this request, exposed
31    /// to PHP via `folk_request_id()` for log correlation.
32    async fn execute(
33        &mut self,
34        method: &str,
35        payload: serde_json::Value,
36        request_id: Arc<str>,
37    ) -> Result<serde_json::Value>;
38
39    /// Terminate the worker. Implementations should signal shutdown and
40    /// wait for the worker to exit.
41    async fn terminate(&mut self) -> Result<()>;
42
43    /// Whether this worker can be recycled (terminated and respawned).
44    /// Returns `false` for the main thread worker which cannot be restarted.
45    fn is_recyclable(&self) -> bool {
46        true
47    }
48}
49
50/// Spawns workers per a runtime-specific strategy.
51#[async_trait]
52pub trait Runtime: Send + Sync + 'static {
53    /// Spawn a single worker and return a handle.
54    ///
55    /// The caller must call `ready()` before dispatching requests.
56    async fn spawn(&self) -> Result<Box<dyn WorkerHandle>>;
57
58    /// Warm up shared caches (OPcache) before spawning workers.
59    /// Called once at startup. Default: no-op.
60    async fn warmup(&self) -> Result<()> {
61        Ok(())
62    }
63
64    /// Invalidate compiled-code caches so respawned workers pick up changes.
65    ///
66    /// Called by the dev-mode file watcher just before workers are recycled.
67    /// In ZTS the OPcache is shared process-wide, so a single reset affects
68    /// every worker thread. Default: no-op.
69    async fn reload(&self) -> Result<()> {
70        Ok(())
71    }
72}
73
74// --- MockRuntime: in-memory runtime for tests ---
75
76type MockResponder =
77    std::sync::Arc<dyn Fn(&str, &serde_json::Value) -> Result<serde_json::Value> + Send + Sync>;
78
79/// In-memory runtime used in tests. Each spawned worker echoes requests back.
80pub struct MockRuntime {
81    responder: MockResponder,
82    next_id: std::sync::atomic::AtomicU32,
83    /// Request ids observed by all workers, in dispatch order. For test assertions.
84    seen_request_ids: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
85}
86
87impl MockRuntime {
88    /// Create a mock runtime that echoes the payload back as the result.
89    pub fn echo() -> Self {
90        Self {
91            responder: std::sync::Arc::new(|_method, payload| Ok(payload.clone())),
92            next_id: std::sync::atomic::AtomicU32::new(10000),
93            seen_request_ids: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
94        }
95    }
96
97    /// Number of workers spawned so far. Useful for asserting recycling.
98    pub fn spawn_count(&self) -> u32 {
99        self.next_id.load(std::sync::atomic::Ordering::Relaxed) - 10000
100    }
101
102    /// Request ids passed to `execute`, in the order they were dispatched.
103    pub fn seen_request_ids(&self) -> Vec<String> {
104        self.seen_request_ids.lock().unwrap().clone()
105    }
106}
107
108#[async_trait]
109impl Runtime for MockRuntime {
110    async fn spawn(&self) -> Result<Box<dyn WorkerHandle>> {
111        let id = self
112            .next_id
113            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
114        Ok(Box::new(MockWorker {
115            id,
116            responder: self.responder.clone(),
117            seen_request_ids: self.seen_request_ids.clone(),
118            terminated: false,
119        }))
120    }
121}
122
123/// In-memory worker used by `MockRuntime`.
124pub struct MockWorker {
125    id: u32,
126    responder: MockResponder,
127    seen_request_ids: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
128    terminated: bool,
129}
130
131#[async_trait]
132impl WorkerHandle for MockWorker {
133    fn id(&self) -> u32 {
134        self.id
135    }
136
137    async fn ready(&mut self) -> Result<()> {
138        Ok(())
139    }
140
141    async fn execute(
142        &mut self,
143        method: &str,
144        payload: serde_json::Value,
145        request_id: Arc<str>,
146    ) -> Result<serde_json::Value> {
147        if self.terminated {
148            anyhow::bail!("worker terminated");
149        }
150        self.seen_request_ids
151            .lock()
152            .unwrap()
153            .push(request_id.to_string());
154        (self.responder)(method, &payload)
155    }
156
157    async fn terminate(&mut self) -> Result<()> {
158        self.terminated = true;
159        Ok(())
160    }
161}