1use std::sync::atomic::{AtomicU32, Ordering};
12use std::sync::mpsc;
13use std::thread;
14use std::time::Instant;
15
16use anyhow::Result;
17use async_trait::async_trait;
18use folk_core::config::WorkersConfig;
19use folk_core::runtime::{Runtime, WorkerHandle};
20use tracing::{debug, info, warn};
21
22use crate::bridge;
23use crate::worker;
24use crate::zts;
25
26static NEXT_WORKER_ID: AtomicU32 = AtomicU32::new(1);
27
28pub struct WorkerTxSide {
30 pub task_tx: mpsc::SyncSender<bridge::TaskRequest>,
31 pub ready_rx: mpsc::Receiver<()>,
32}
33
34pub struct ExtensionRuntime {
36 config: WorkersConfig,
37 channels: std::sync::Mutex<Vec<WorkerTxSide>>,
39}
40
41impl ExtensionRuntime {
42 pub fn new(config: WorkersConfig, tx_sides: Vec<WorkerTxSide>) -> Self {
44 Self {
45 config,
46 channels: std::sync::Mutex::new(tx_sides),
47 }
48 }
49
50 #[allow(clippy::unnecessary_wraps)] fn spawn_zts_worker(&self) -> Result<Box<dyn WorkerHandle>> {
53 let worker_id = NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed);
54 let script = std::env::current_dir()
57 .unwrap_or_default()
58 .join(&self.config.script)
59 .to_string_lossy()
60 .into_owned();
61
62 let (task_tx, task_rx) = mpsc::sync_channel::<bridge::TaskRequest>(8);
63 let (ready_tx, ready_rx) = mpsc::sync_channel::<()>(1);
64
65 let thread_handle = worker::spawn_zts_worker(worker_id, script, task_rx, ready_tx);
66
67 debug!(worker_id, "ZTS worker thread spawned");
68
69 Ok(Box::new(ChannelWorkerHandle {
70 worker_id,
71 task_tx: Some(task_tx),
72 ready_rx: Some(ready_rx),
73 thread_handle: Some(thread_handle),
74 }))
75 }
76
77 fn take_preconnected(&self) -> Result<Box<dyn WorkerHandle>> {
79 let worker_id = NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed);
80 let tx_side = self.channels.lock().unwrap().pop().ok_or_else(|| {
81 anyhow::anyhow!("no more pre-connected channels (worker {worker_id})")
82 })?;
83
84 debug!(worker_id, "pre-connected worker channel taken");
85
86 Ok(Box::new(ChannelWorkerHandle {
87 worker_id,
88 task_tx: Some(tx_side.task_tx),
89 ready_rx: Some(tx_side.ready_rx),
90 thread_handle: None, }))
92 }
93}
94
95#[async_trait]
96impl Runtime for ExtensionRuntime {
97 async fn spawn(&self) -> Result<Box<dyn WorkerHandle>> {
98 let has_preconnected = !self.channels.lock().unwrap().is_empty();
99
100 if has_preconnected {
101 self.take_preconnected()
102 } else if self.config.count > 1 {
103 self.spawn_zts_worker()
104 } else {
105 anyhow::bail!("no workers available and ZTS multi-worker not requested")
106 }
107 }
108
109 async fn warmup(&self) -> Result<()> {
110 if !zts::is_zts() {
111 debug!("opcache warmup: skipping (NTS mode)");
112 return Ok(());
113 }
114
115 let project_dir = std::env::current_dir().unwrap_or_default();
116 let classmap_path = project_dir.join("vendor/composer/autoload_classmap.php");
117
118 if !classmap_path.exists() {
119 warn!("opcache warmup: vendor/composer/autoload_classmap.php not found, skipping");
120 return Ok(());
121 }
122
123 let classmap_path_str = classmap_path.to_string_lossy().into_owned();
124 let warmup_code = format!(
125 r"
126$classmap = require '{classmap_path_str}';
127$loaded = 0;
128foreach ($classmap as $class => $file) {{
129 if (is_file($file) && function_exists('opcache_compile_file')) {{
130 @opcache_compile_file($file);
131 $loaded++;
132 }}
133}}
134"
135 );
136
137 let start = Instant::now();
138
139 let result = tokio::task::spawn_blocking(move || {
141 let _guard = zts::ZtsThreadGuard::new();
142 zts::request_startup()?;
143 let exec_result = zts::eval_string(&warmup_code);
144 zts::request_shutdown();
145 exec_result
146 })
147 .await
148 .map_err(|e| anyhow::anyhow!("warmup thread panicked: {e}"))?;
149
150 let elapsed = start.elapsed();
151 let elapsed_ms = u32::try_from(elapsed.as_millis()).unwrap_or(u32::MAX);
152 match result {
153 Ok(()) => info!(elapsed_ms, "opcache warmup done"),
154 Err(e) => {
155 warn!(error = %e, elapsed_ms, "opcache warmup script failed");
156 },
157 }
158
159 Ok(())
160 }
161}
162
163pub struct ChannelWorkerHandle {
165 worker_id: u32,
166 task_tx: Option<mpsc::SyncSender<bridge::TaskRequest>>,
167 ready_rx: Option<mpsc::Receiver<()>>,
168 thread_handle: Option<thread::JoinHandle<()>>,
169}
170
171#[async_trait]
172impl WorkerHandle for ChannelWorkerHandle {
173 fn id(&self) -> u32 {
174 self.worker_id
175 }
176
177 async fn ready(&mut self) -> Result<()> {
178 if let Some(rx) = self.ready_rx.take() {
179 tokio::task::spawn_blocking(move || rx.recv())
180 .await
181 .map_err(|e| anyhow::anyhow!("spawn_blocking panicked: {e}"))?
182 .map_err(|_| anyhow::anyhow!("worker died before ready"))?;
183 }
184 Ok(())
185 }
186
187 async fn execute(
188 &mut self,
189 method: &str,
190 payload: serde_json::Value,
191 ) -> Result<serde_json::Value> {
192 let tx = self
193 .task_tx
194 .as_ref()
195 .ok_or_else(|| anyhow::anyhow!("worker terminated"))?
196 .clone();
197
198 let method = method.to_string();
199
200 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
202
203 tx.send(bridge::TaskRequest {
206 method,
207 payload,
208 reply: reply_tx,
209 })
210 .map_err(|_| anyhow::anyhow!("worker process gone"))?;
211
212 reply_rx
214 .await
215 .map_err(|_| anyhow::anyhow!("worker dropped reply"))?
216 }
217
218 async fn terminate(&mut self) -> Result<()> {
219 self.task_tx.take();
221
222 if let Some(handle) = self.thread_handle.take() {
224 tokio::task::spawn_blocking(move || {
225 let _ = handle.join();
226 })
227 .await
228 .map_err(|e| anyhow::anyhow!("spawn_blocking panicked: {e}"))?;
229 }
230
231 Ok(())
232 }
233
234 fn is_recyclable(&self) -> bool {
235 self.thread_handle.is_some()
237 }
238}