Skip to main content

folk_ext/
runtime.rs

1//! Extension runtime: workers via channels.
2//!
3//! Two modes:
4//! - **Single-worker (NTS):** Main PHP thread is the worker. Pre-connected channels.
5//! - **Multi-worker (ZTS):** Additional worker threads spawned from Rust,
6//!   each with its own PHP context via TSRM.
7//!
8//! Uses `std::sync::mpsc` for task dispatch (worker thread blocks on recv).
9//! Uses `tokio::sync::oneshot` for reply (no blocking on tokio side).
10
11use std::sync::atomic::{AtomicU32, Ordering};
12use std::sync::mpsc;
13use std::thread;
14use std::time::Instant;
15
16use anyhow::Result;
17use async_trait::async_trait;
18use folk_core::config::WorkersConfig;
19use folk_core::runtime::{Runtime, WorkerHandle};
20use tracing::{debug, info, warn};
21
22use crate::bridge;
23use crate::worker;
24use crate::zts;
25
26static NEXT_WORKER_ID: AtomicU32 = AtomicU32::new(1);
27
28/// Tx side of a worker channel pair (kept by the runtime/pool).
29pub struct WorkerTxSide {
30    pub task_tx: mpsc::SyncSender<bridge::TaskRequest>,
31    pub ready_rx: mpsc::Receiver<()>,
32}
33
34/// Extension runtime — manages worker channels and ZTS threads.
35pub struct ExtensionRuntime {
36    config: WorkersConfig,
37    /// Pre-created channel pairs for NTS mode (main thread worker).
38    channels: std::sync::Mutex<Vec<WorkerTxSide>>,
39}
40
41impl ExtensionRuntime {
42    /// Create a runtime with pre-connected channels (for the main thread worker).
43    pub fn new(config: WorkersConfig, tx_sides: Vec<WorkerTxSide>) -> Self {
44        Self {
45            config,
46            channels: std::sync::Mutex::new(tx_sides),
47        }
48    }
49
50    /// Spawn a ZTS worker thread with fresh channels.
51    #[allow(clippy::unnecessary_wraps)] // Result for consistency with Runtime trait
52    fn spawn_zts_worker(&self) -> Result<Box<dyn WorkerHandle>> {
53        let worker_id = NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed);
54        // ZTS threads may have a different CWD after php_request_startup(),
55        // so resolve to absolute path before spawning.
56        let script = std::env::current_dir()
57            .unwrap_or_default()
58            .join(&self.config.script)
59            .to_string_lossy()
60            .into_owned();
61
62        let (task_tx, task_rx) = mpsc::sync_channel::<bridge::TaskRequest>(8);
63        let (ready_tx, ready_rx) = mpsc::sync_channel::<()>(1);
64
65        let thread_handle = worker::spawn_zts_worker(worker_id, script, task_rx, ready_tx);
66
67        debug!(worker_id, "ZTS worker thread spawned");
68
69        Ok(Box::new(ChannelWorkerHandle {
70            worker_id,
71            task_tx: Some(task_tx),
72            ready_rx: Some(ready_rx),
73            thread_handle: Some(thread_handle),
74        }))
75    }
76
77    /// Take a pre-connected channel pair (for the main thread / NTS worker).
78    fn take_preconnected(&self) -> Result<Box<dyn WorkerHandle>> {
79        let worker_id = NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed);
80        let tx_side = self.channels.lock().unwrap().pop().ok_or_else(|| {
81            anyhow::anyhow!("no more pre-connected channels (worker {worker_id})")
82        })?;
83
84        debug!(worker_id, "pre-connected worker channel taken");
85
86        Ok(Box::new(ChannelWorkerHandle {
87            worker_id,
88            task_tx: Some(tx_side.task_tx),
89            ready_rx: Some(tx_side.ready_rx),
90            thread_handle: None, // main thread — not managed by us
91        }))
92    }
93}
94
95#[async_trait]
96impl Runtime for ExtensionRuntime {
97    async fn spawn(&self) -> Result<Box<dyn WorkerHandle>> {
98        let has_preconnected = !self.channels.lock().unwrap().is_empty();
99
100        if has_preconnected {
101            self.take_preconnected()
102        } else if self.config.count > 1 {
103            self.spawn_zts_worker()
104        } else {
105            anyhow::bail!("no workers available and ZTS multi-worker not requested")
106        }
107    }
108
109    async fn warmup(&self) -> Result<()> {
110        if !zts::is_zts() {
111            debug!("opcache warmup: skipping (NTS mode)");
112            return Ok(());
113        }
114
115        let project_dir = std::env::current_dir().unwrap_or_default();
116        let classmap_path = project_dir.join("vendor/composer/autoload_classmap.php");
117
118        if !classmap_path.exists() {
119            warn!("opcache warmup: vendor/composer/autoload_classmap.php not found, skipping");
120            return Ok(());
121        }
122
123        let classmap_path_str = classmap_path.to_string_lossy().into_owned();
124        let warmup_code = format!(
125            r"
126$classmap = require '{classmap_path_str}';
127$loaded = 0;
128foreach ($classmap as $class => $file) {{
129    if (is_file($file) && function_exists('opcache_compile_file')) {{
130        @opcache_compile_file($file);
131        $loaded++;
132    }}
133}}
134"
135        );
136
137        let start = Instant::now();
138
139        // Run warmup in a dedicated thread (ZTS requires thread-local TSRM context).
140        let result = tokio::task::spawn_blocking(move || {
141            let _guard = zts::ZtsThreadGuard::new();
142            zts::request_startup()?;
143            let exec_result = zts::eval_string(&warmup_code);
144            zts::request_shutdown();
145            exec_result
146        })
147        .await
148        .map_err(|e| anyhow::anyhow!("warmup thread panicked: {e}"))?;
149
150        let elapsed = start.elapsed();
151        let elapsed_ms = u32::try_from(elapsed.as_millis()).unwrap_or(u32::MAX);
152        match result {
153            Ok(()) => info!(elapsed_ms, "opcache warmup done"),
154            Err(e) => {
155                warn!(error = %e, elapsed_ms, "opcache warmup script failed");
156            },
157        }
158
159        Ok(())
160    }
161}
162
163/// Handle connected to a worker via channels.
164pub struct ChannelWorkerHandle {
165    worker_id: u32,
166    task_tx: Option<mpsc::SyncSender<bridge::TaskRequest>>,
167    ready_rx: Option<mpsc::Receiver<()>>,
168    thread_handle: Option<thread::JoinHandle<()>>,
169}
170
171#[async_trait]
172impl WorkerHandle for ChannelWorkerHandle {
173    fn id(&self) -> u32 {
174        self.worker_id
175    }
176
177    async fn ready(&mut self) -> Result<()> {
178        if let Some(rx) = self.ready_rx.take() {
179            tokio::task::spawn_blocking(move || rx.recv())
180                .await
181                .map_err(|e| anyhow::anyhow!("spawn_blocking panicked: {e}"))?
182                .map_err(|_| anyhow::anyhow!("worker died before ready"))?;
183        }
184        Ok(())
185    }
186
187    async fn execute(
188        &mut self,
189        method: &str,
190        payload: serde_json::Value,
191    ) -> Result<serde_json::Value> {
192        let tx = self
193            .task_tx
194            .as_ref()
195            .ok_or_else(|| anyhow::anyhow!("worker terminated"))?
196            .clone();
197
198        let method = method.to_string();
199
200        // tokio oneshot for reply — send() is lock-free, recv() is async.
201        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
202
203        // SyncSender::send blocks only when channel is full (capacity=8).
204        // With semaphore=4, at most 4 in-flight — never blocks.
205        tx.send(bridge::TaskRequest {
206            method,
207            payload,
208            reply: reply_tx,
209        })
210        .map_err(|_| anyhow::anyhow!("worker process gone"))?;
211
212        // Await reply asynchronously — no spawn_blocking needed!
213        reply_rx
214            .await
215            .map_err(|_| anyhow::anyhow!("worker dropped reply"))?
216    }
217
218    async fn terminate(&mut self) -> Result<()> {
219        // Close channel — dispatch loop will exit.
220        self.task_tx.take();
221
222        // Wait for ZTS thread to finish PHP cleanup (ts_free_thread etc).
223        if let Some(handle) = self.thread_handle.take() {
224            tokio::task::spawn_blocking(move || {
225                let _ = handle.join();
226            })
227            .await
228            .map_err(|e| anyhow::anyhow!("spawn_blocking panicked: {e}"))?;
229        }
230
231        Ok(())
232    }
233
234    fn is_recyclable(&self) -> bool {
235        // Main thread (preconnected) cannot be recycled — it IS the PHP process.
236        self.thread_handle.is_some()
237    }
238}