Skip to main content

folk_core/
runtime.rs

1//! Abstraction over how PHP workers are spawned.
2//!
3//! `WorkerPool` does not know what a PHP process is. It asks the
4//! `Runtime` to spawn a worker; the runtime returns a `WorkerHandle` which
5//! gives the pool a way to execute requests and to terminate the worker.
6//!
7//! In phase 23 (extension mode), the runtime spawns OS threads that run PHP
8//! inside the same process. Communication is via channels (zero IPC).
9
10use anyhow::Result;
11use async_trait::async_trait;
12
13/// A handle to a spawned worker.
14///
15/// The pool dispatches requests via `execute` and controls lifecycle via
16/// `ready` and `terminate`.
17#[async_trait]
18pub trait WorkerHandle: Send + 'static {
19    /// Worker identifier (thread ID, PID, or synthetic).
20    fn id(&self) -> u32;
21
22    /// Wait for the worker to signal readiness.
23    /// Returns once the worker has booted and is ready to accept requests.
24    async fn ready(&mut self) -> Result<()>;
25
26    /// Execute a single request: send structured data, receive result.
27    ///
28    /// `request_id` is a unique, monotonic id for this request, exposed to PHP
29    /// via `folk_request_id()` for log correlation.
30    async fn execute(
31        &mut self,
32        method: &str,
33        payload: serde_json::Value,
34        request_id: u64,
35    ) -> Result<serde_json::Value>;
36
37    /// Terminate the worker. Implementations should signal shutdown and
38    /// wait for the worker to exit.
39    async fn terminate(&mut self) -> Result<()>;
40
41    /// Whether this worker can be recycled (terminated and respawned).
42    /// Returns `false` for the main thread worker which cannot be restarted.
43    fn is_recyclable(&self) -> bool {
44        true
45    }
46}
47
48/// Spawns workers per a runtime-specific strategy.
49#[async_trait]
50pub trait Runtime: Send + Sync + 'static {
51    /// Spawn a single worker and return a handle.
52    ///
53    /// The caller must call `ready()` before dispatching requests.
54    async fn spawn(&self) -> Result<Box<dyn WorkerHandle>>;
55
56    /// Warm up shared caches (OPcache) before spawning workers.
57    /// Called once at startup. Default: no-op.
58    async fn warmup(&self) -> Result<()> {
59        Ok(())
60    }
61
62    /// Invalidate compiled-code caches so respawned workers pick up changes.
63    ///
64    /// Called by the dev-mode file watcher just before workers are recycled.
65    /// In ZTS the OPcache is shared process-wide, so a single reset affects
66    /// every worker thread. Default: no-op.
67    async fn reload(&self) -> Result<()> {
68        Ok(())
69    }
70}
71
72// --- MockRuntime: in-memory runtime for tests ---
73
74type MockResponder =
75    std::sync::Arc<dyn Fn(&str, &serde_json::Value) -> Result<serde_json::Value> + Send + Sync>;
76
77/// In-memory runtime used in tests. Each spawned worker echoes requests back.
78pub struct MockRuntime {
79    responder: MockResponder,
80    next_id: std::sync::atomic::AtomicU32,
81    /// Request ids observed by all workers, in dispatch order. For test assertions.
82    seen_request_ids: std::sync::Arc<std::sync::Mutex<Vec<u64>>>,
83}
84
85impl MockRuntime {
86    /// Create a mock runtime that echoes the payload back as the result.
87    pub fn echo() -> Self {
88        Self {
89            responder: std::sync::Arc::new(|_method, payload| Ok(payload.clone())),
90            next_id: std::sync::atomic::AtomicU32::new(10000),
91            seen_request_ids: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
92        }
93    }
94
95    /// Number of workers spawned so far. Useful for asserting recycling.
96    pub fn spawn_count(&self) -> u32 {
97        self.next_id.load(std::sync::atomic::Ordering::Relaxed) - 10000
98    }
99
100    /// Request ids passed to `execute`, in the order they were dispatched.
101    pub fn seen_request_ids(&self) -> Vec<u64> {
102        self.seen_request_ids.lock().unwrap().clone()
103    }
104}
105
106#[async_trait]
107impl Runtime for MockRuntime {
108    async fn spawn(&self) -> Result<Box<dyn WorkerHandle>> {
109        let id = self
110            .next_id
111            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
112        Ok(Box::new(MockWorker {
113            id,
114            responder: self.responder.clone(),
115            seen_request_ids: self.seen_request_ids.clone(),
116            terminated: false,
117        }))
118    }
119}
120
121/// In-memory worker used by `MockRuntime`.
122pub struct MockWorker {
123    id: u32,
124    responder: MockResponder,
125    seen_request_ids: std::sync::Arc<std::sync::Mutex<Vec<u64>>>,
126    terminated: bool,
127}
128
129#[async_trait]
130impl WorkerHandle for MockWorker {
131    fn id(&self) -> u32 {
132        self.id
133    }
134
135    async fn ready(&mut self) -> Result<()> {
136        Ok(())
137    }
138
139    async fn execute(
140        &mut self,
141        method: &str,
142        payload: serde_json::Value,
143        request_id: u64,
144    ) -> Result<serde_json::Value> {
145        if self.terminated {
146            anyhow::bail!("worker terminated");
147        }
148        self.seen_request_ids.lock().unwrap().push(request_id);
149        (self.responder)(method, &payload)
150    }
151
152    async fn terminate(&mut self) -> Result<()> {
153        self.terminated = true;
154        Ok(())
155    }
156}