1use std::sync::atomic::{AtomicU32, Ordering};
12use std::sync::mpsc;
13use std::thread;
14use std::time::Instant;
15
16use anyhow::Result;
17use async_trait::async_trait;
18use folk_core::config::WorkersConfig;
19use folk_core::runtime::{Runtime, WorkerHandle};
20use tracing::{debug, info, warn};
21
22use crate::bridge;
23use crate::worker;
24use crate::zts;
25
26static NEXT_WORKER_ID: AtomicU32 = AtomicU32::new(1);
27
28pub struct WorkerTxSide {
30 pub task_tx: mpsc::SyncSender<bridge::TaskRequest>,
31 pub ready_rx: mpsc::Receiver<()>,
32}
33
34pub struct ExtensionRuntime {
36 config: WorkersConfig,
37 channels: std::sync::Mutex<Vec<WorkerTxSide>>,
39}
40
41impl ExtensionRuntime {
42 pub fn new(config: WorkersConfig, tx_sides: Vec<WorkerTxSide>) -> Self {
44 Self {
45 config,
46 channels: std::sync::Mutex::new(tx_sides),
47 }
48 }
49
50 #[allow(clippy::unnecessary_wraps)] fn spawn_zts_worker(&self) -> Result<Box<dyn WorkerHandle>> {
53 let worker_id = NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed);
54 let script = std::env::current_dir()
57 .unwrap_or_default()
58 .join(&self.config.script)
59 .to_string_lossy()
60 .into_owned();
61
62 let (task_tx, task_rx) = mpsc::sync_channel::<bridge::TaskRequest>(8);
63 let (ready_tx, ready_rx) = mpsc::sync_channel::<()>(1);
64
65 let thread_handle = worker::spawn_zts_worker(worker_id, script, task_rx, ready_tx);
66
67 debug!(worker_id, "ZTS worker thread spawned");
68
69 Ok(Box::new(ChannelWorkerHandle {
70 worker_id,
71 task_tx: Some(task_tx),
72 ready_rx: Some(ready_rx),
73 thread_handle: Some(thread_handle),
74 }))
75 }
76
77 fn take_preconnected(&self) -> Result<Box<dyn WorkerHandle>> {
79 let worker_id = NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed);
80 let tx_side = self.channels.lock().unwrap().pop().ok_or_else(|| {
81 anyhow::anyhow!("no more pre-connected channels (worker {worker_id})")
82 })?;
83
84 debug!(worker_id, "pre-connected worker channel taken");
85
86 Ok(Box::new(ChannelWorkerHandle {
87 worker_id,
88 task_tx: Some(tx_side.task_tx),
89 ready_rx: Some(tx_side.ready_rx),
90 thread_handle: None, }))
92 }
93}
94
95#[async_trait]
96impl Runtime for ExtensionRuntime {
97 async fn spawn(&self) -> Result<Box<dyn WorkerHandle>> {
98 let has_preconnected = !self.channels.lock().unwrap().is_empty();
99
100 if has_preconnected {
101 self.take_preconnected()
102 } else if self.config.count > 1 {
103 self.spawn_zts_worker()
104 } else {
105 anyhow::bail!("no workers available and ZTS multi-worker not requested")
106 }
107 }
108
109 async fn warmup(&self) -> Result<()> {
110 if !zts::is_zts() {
111 debug!("opcache warmup: skipping (NTS mode)");
112 return Ok(());
113 }
114
115 let project_dir = std::env::current_dir().unwrap_or_default();
116 let classmap_path = project_dir.join("vendor/composer/autoload_classmap.php");
117
118 if !classmap_path.exists() {
119 warn!("opcache warmup: vendor/composer/autoload_classmap.php not found, skipping");
120 return Ok(());
121 }
122
123 let classmap_path_str = classmap_path.to_string_lossy().into_owned();
124 let warmup_code = format!(
125 r"
126$classmap = require '{classmap_path_str}';
127$loaded = 0;
128foreach ($classmap as $class => $file) {{
129 if (is_file($file) && function_exists('opcache_compile_file')) {{
130 @opcache_compile_file($file);
131 $loaded++;
132 }}
133}}
134"
135 );
136
137 let start = Instant::now();
138
139 let result = tokio::task::spawn_blocking(move || {
141 let _guard = zts::ZtsThreadGuard::new();
142 zts::request_startup()?;
143 let exec_result = zts::eval_string(&warmup_code);
144 zts::request_shutdown();
145 exec_result
146 })
147 .await
148 .map_err(|e| anyhow::anyhow!("warmup thread panicked: {e}"))?;
149
150 let elapsed = start.elapsed();
151 let elapsed_ms = u32::try_from(elapsed.as_millis()).unwrap_or(u32::MAX);
152 match result {
153 Ok(()) => info!(elapsed_ms, "opcache warmup done"),
154 Err(e) => {
155 warn!(error = %e, elapsed_ms, "opcache warmup script failed");
156 },
157 }
158
159 Ok(())
160 }
161
162 }
171
172pub struct ChannelWorkerHandle {
174 worker_id: u32,
175 task_tx: Option<mpsc::SyncSender<bridge::TaskRequest>>,
176 ready_rx: Option<mpsc::Receiver<()>>,
177 thread_handle: Option<thread::JoinHandle<()>>,
178}
179
180#[async_trait]
181impl WorkerHandle for ChannelWorkerHandle {
182 fn id(&self) -> u32 {
183 self.worker_id
184 }
185
186 async fn ready(&mut self) -> Result<()> {
187 if let Some(rx) = self.ready_rx.take() {
188 tokio::task::spawn_blocking(move || rx.recv())
189 .await
190 .map_err(|e| anyhow::anyhow!("spawn_blocking panicked: {e}"))?
191 .map_err(|_| anyhow::anyhow!("worker died before ready"))?;
192 }
193 Ok(())
194 }
195
196 async fn execute(
197 &mut self,
198 method: &str,
199 payload: serde_json::Value,
200 ) -> Result<serde_json::Value> {
201 let tx = self
202 .task_tx
203 .as_ref()
204 .ok_or_else(|| anyhow::anyhow!("worker terminated"))?
205 .clone();
206
207 let method = method.to_string();
208
209 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
211
212 tx.send(bridge::TaskRequest {
215 method,
216 payload,
217 reply: reply_tx,
218 })
219 .map_err(|_| anyhow::anyhow!("worker process gone"))?;
220
221 reply_rx
223 .await
224 .map_err(|_| anyhow::anyhow!("worker dropped reply"))?
225 }
226
227 async fn terminate(&mut self) -> Result<()> {
228 self.task_tx.take();
230
231 if let Some(handle) = self.thread_handle.take() {
233 tokio::task::spawn_blocking(move || {
234 let _ = handle.join();
235 })
236 .await
237 .map_err(|e| anyhow::anyhow!("spawn_blocking panicked: {e}"))?;
238 }
239
240 Ok(())
241 }
242
243 fn is_recyclable(&self) -> bool {
244 self.thread_handle.is_some()
246 }
247}