Skip to main content

folk_core/
worker_pool.rs

1//! Worker pool: dispatches requests to PHP workers, manages slot lifecycle.
2//!
3//! See `folk-spec/spec/03-worker-lifecycle.md` for the design.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9use anyhow::{Context, Result, anyhow};
10use async_trait::async_trait;
11use bytes::Bytes;
12use folk_api::Executor;
13use tokio::sync::{Semaphore, mpsc, oneshot, watch};
14use tokio::task::JoinHandle;
15use tracing::{debug, error, info, warn};
16
17use crate::config::WorkersConfig;
18use crate::runtime::{Runtime, WorkerHandle};
19use crate::worker_slot::SlotInfo;
20
21/// Pool errors. Plugins typically translate these into their own domain errors.
22#[derive(Debug, thiserror::Error)]
23pub enum WorkError {
24    #[error("all workers busy")]
25    Busy,
26    #[error("worker died during request")]
27    WorkerDied,
28    #[error("execution timed out")]
29    Timeout,
30    #[error("worker returned application error: {message}")]
31    Application { code: i32, message: String },
32    #[error("internal error: {0}")]
33    Internal(String),
34}
35
36/// One dispatch request: id, method name, payload + reply channel.
37struct DispatchRequest {
38    request_id: u64,
39    method: String,
40    payload: serde_json::Value,
41    reply: oneshot::Sender<Result<serde_json::Value>>,
42}
43
44/// Worker pool — the dispatch surface.
45pub struct WorkerPool {
46    request_tx: mpsc::Sender<DispatchRequest>,
47    semaphore: Arc<Semaphore>,
48    /// Monotonic generator for per-request ids. Starts at 1; 0 means "no request".
49    next_request_id: AtomicU64,
50    runtime: Arc<dyn Runtime>,
51    /// Monotonic reload generation. Bumped by `trigger_reload`; observed by
52    /// slot supervisors to recycle their workers after the current request.
53    reload_tx: watch::Sender<u64>,
54    _pool_task: JoinHandle<()>,
55}
56
57impl WorkerPool {
58    /// Construct a pool with `config.count` workers spawned via `runtime`.
59    ///
60    /// Returns once the pool task is started. Workers boot asynchronously
61    /// in the background.
62    pub fn new(runtime: Arc<dyn Runtime>, config: WorkersConfig) -> Result<Arc<Self>> {
63        let semaphore = Arc::new(Semaphore::new(config.count));
64        let (request_tx, request_rx) = mpsc::channel::<DispatchRequest>(1024);
65        let (reload_tx, reload_rx) = watch::channel(0u64);
66
67        let pool_task = tokio::spawn(pool_main(
68            runtime.clone(),
69            config,
70            request_rx,
71            semaphore.clone(),
72            reload_rx,
73        ));
74
75        Ok(Arc::new(Self {
76            request_tx,
77            semaphore,
78            next_request_id: AtomicU64::new(1),
79            runtime,
80            reload_tx,
81            _pool_task: pool_task,
82        }))
83    }
84
85    /// Trigger a hot reload: invalidate compiled-code caches, then signal all
86    /// recyclable workers to restart after their current request completes.
87    ///
88    /// Non-recyclable workers (the main PHP thread) keep running — see the
89    /// dev-mode docs for the implications.
90    pub async fn trigger_reload(&self) {
91        if let Err(e) = self.runtime.reload().await {
92            warn!(error = %e, "reload: cache invalidation failed; recycling anyway");
93        }
94        self.reload_tx.send_modify(|g| *g += 1);
95        let generation = *self.reload_tx.borrow();
96        info!(generation, "hot reload triggered; recycling workers");
97    }
98
99    /// Dispatch a Value-based request through the pool.
100    async fn dispatch_value(
101        &self,
102        method: &str,
103        payload: serde_json::Value,
104    ) -> Result<serde_json::Value> {
105        let permit = self
106            .semaphore
107            .clone()
108            .acquire_owned()
109            .await
110            .context("pool semaphore closed")?;
111
112        let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
113        let (reply_tx, reply_rx) = oneshot::channel();
114        self.request_tx
115            .send(DispatchRequest {
116                request_id,
117                method: method.to_string(),
118                payload,
119                reply: reply_tx,
120            })
121            .await
122            .map_err(|_| anyhow!("pool task gone"))?;
123
124        let result = reply_rx
125            .await
126            .map_err(|_| anyhow!("pool dropped reply channel"))?;
127
128        drop(permit);
129        result
130    }
131}
132
133#[async_trait]
134impl Executor for WorkerPool {
135    async fn execute_method(&self, method: &str, payload: Bytes) -> Result<Bytes> {
136        debug!(
137            method,
138            payload_len = payload.len(),
139            "pool: execute_method called (bytes path)"
140        );
141        // Legacy path: parse JSON bytes → Value → dispatch → Value → serialize
142        let value: serde_json::Value =
143            serde_json::from_slice(&payload).context("pool: failed to parse payload as JSON")?;
144        let result = self.dispatch_value(method, value).await?;
145        let bytes = serde_json::to_vec(&result).context("pool: failed to serialize response")?;
146        Ok(Bytes::from(bytes))
147    }
148
149    async fn execute_value(
150        &self,
151        method: &str,
152        payload: serde_json::Value,
153    ) -> Result<serde_json::Value> {
154        debug!(method, "pool: execute_value called (zero-copy path)");
155        self.dispatch_value(method, payload).await
156    }
157}
158
159// ---- Pool task ---------------------------------------------------------------
160
161async fn pool_main(
162    runtime: Arc<dyn Runtime>,
163    config: WorkersConfig,
164    mut request_rx: mpsc::Receiver<DispatchRequest>,
165    _semaphore: Arc<Semaphore>,
166    reload_rx: watch::Receiver<u64>,
167) {
168    let mut slot_inboxes: Vec<mpsc::Sender<DispatchRequest>> = Vec::with_capacity(config.count);
169    let mut slot_supervisors: Vec<JoinHandle<()>> = Vec::with_capacity(config.count);
170
171    for slot_id in 0..config.count {
172        let (slot_tx, slot_rx) = mpsc::channel::<DispatchRequest>(8);
173        slot_inboxes.push(slot_tx);
174        let runtime_clone = runtime.clone();
175        let cfg_clone = config.clone();
176        let reload_clone = reload_rx.clone();
177        let supervisor = tokio::spawn(slot_supervisor(
178            slot_id,
179            runtime_clone,
180            cfg_clone,
181            slot_rx,
182            reload_clone,
183        ));
184        slot_supervisors.push(supervisor);
185    }
186
187    // Round-robin dispatch.
188    let mut next: usize = 0;
189    while let Some(req) = request_rx.recv().await {
190        let chosen = next % slot_inboxes.len();
191        next = next.wrapping_add(1);
192
193        if slot_inboxes[chosen].send(req).await.is_err() {
194            warn!(slot_id = chosen, "slot inbox closed; failed to dispatch");
195        }
196    }
197
198    info!("pool main loop exiting; awaiting supervisors");
199    for handle in slot_supervisors {
200        let _ = handle.await;
201    }
202}
203
204// ---- Slot supervisor ---------------------------------------------------------
205
206async fn slot_supervisor(
207    slot_id: usize,
208    runtime: Arc<dyn Runtime>,
209    config: WorkersConfig,
210    mut inbox: mpsc::Receiver<DispatchRequest>,
211    mut reload_rx: watch::Receiver<u64>,
212) {
213    let mut slot = SlotInfo::new();
214    let mut worker: Option<Box<dyn WorkerHandle>> = None;
215    // Reload generation this worker was booted at. When the pool's generation
216    // advances past this, the worker must be recycled to pick up new code.
217    let mut boot_generation: u64 = *reload_rx.borrow();
218
219    loop {
220        // Spawn a worker if we don't have one.
221        if worker.is_none() {
222            boot_generation = *reload_rx.borrow();
223            match boot_worker(&runtime, &config, &mut slot).await {
224                Ok(w) => worker = Some(w),
225                Err(e) => {
226                    error!(slot_id, error = ?e, "failed to boot worker, will retry");
227                    tokio::time::sleep(Duration::from_secs(1)).await;
228                    continue;
229                },
230            }
231        }
232
233        let recyclable = worker.as_ref().is_some_and(|w| w.is_recyclable());
234
235        // Wait for a request, a reload signal, or shutdown.
236        let req = tokio::select! {
237            biased;
238            // React to a reload while idle so workers restart promptly even
239            // without traffic. Skip for non-recyclable workers (main thread).
240            res = reload_rx.changed(), if recyclable => {
241                if res.is_err() {
242                    // Pool dropped the sender — treat as shutdown.
243                    info!(slot_id, "supervisor shutting down (reload channel closed)");
244                    if let Some(mut w) = worker.take() {
245                        let _ = w.terminate().await;
246                    }
247                    return;
248                }
249                if *reload_rx.borrow() > boot_generation {
250                    info!(slot_id, "recycling idle worker for hot reload");
251                    if let Some(mut w) = worker.take() {
252                        let _ = w.terminate().await;
253                    }
254                    slot = SlotInfo::new();
255                }
256                continue;
257            },
258            maybe_req = inbox.recv() => {
259                let Some(req) = maybe_req else {
260                    info!(slot_id, "supervisor shutting down (inbox closed)");
261                    if let Some(mut w) = worker.take() {
262                        if let Err(e) = w.terminate().await {
263                            warn!(slot_id, error = ?e, "terminate error during shutdown");
264                        }
265                    }
266                    return;
267                };
268                req
269            },
270        };
271
272        // Dispatch.
273        let Some(w) = worker.as_mut() else {
274            unreachable!()
275        };
276        slot.mark_busy();
277        let result = dispatch_one(
278            w.as_mut(),
279            &req.method,
280            req.payload,
281            req.request_id,
282            config.exec_timeout,
283        )
284        .await;
285        slot.mark_idle();
286
287        // Send reply.
288        let _ = req.reply.send(result.map_err(anyhow::Error::from));
289
290        // Recycle on reload (after the request completes) or per the lifecycle
291        // policies (max_jobs / ttl).
292        let reload_pending = *reload_rx.borrow() > boot_generation;
293        if reload_pending || slot.should_recycle(&config) {
294            if let Some(ref w) = worker {
295                if !w.is_recyclable() {
296                    debug!(slot_id, "skipping recycle for non-recyclable worker");
297                    continue;
298                }
299            }
300            let reason = if reload_pending {
301                "hot reload"
302            } else {
303                "lifecycle"
304            };
305            info!(
306                slot_id,
307                jobs = slot.jobs_handled,
308                reason,
309                "recycling worker"
310            );
311            if let Some(mut w) = worker.take() {
312                let _ = w.terminate().await;
313            }
314            slot = SlotInfo::new();
315        }
316    }
317}
318
319async fn boot_worker(
320    runtime: &Arc<dyn Runtime>,
321    config: &WorkersConfig,
322    slot: &mut SlotInfo,
323) -> Result<Box<dyn WorkerHandle>> {
324    debug!("boot_worker: spawning");
325    let mut handle = runtime.spawn().await.context("spawn")?;
326    debug!(id = handle.id(), "boot_worker: waiting for ready");
327
328    let timeout = tokio::time::timeout(config.boot_timeout, handle.ready());
329    match timeout.await {
330        Ok(Ok(())) => {
331            let id = handle.id();
332            slot.mark_ready(id);
333            debug!(id, "worker ready");
334            Ok(handle)
335        },
336        Ok(Err(e)) => {
337            let _ = handle.terminate().await;
338            Err(e).context("worker ready() failed during boot")
339        },
340        Err(_) => {
341            let _ = handle.terminate().await;
342            anyhow::bail!("worker boot timed out after {:?}", config.boot_timeout)
343        },
344    }
345}
346
347async fn dispatch_one(
348    worker: &mut dyn WorkerHandle,
349    method: &str,
350    payload: serde_json::Value,
351    request_id: u64,
352    exec_timeout: Duration,
353) -> Result<serde_json::Value, WorkError> {
354    let recv = tokio::time::timeout(exec_timeout, worker.execute(method, payload, request_id));
355    match recv.await {
356        Ok(Ok(result)) => Ok(result),
357        Ok(Err(e)) => Err(WorkError::Internal(e.to_string())),
358        Err(_) => Err(WorkError::Timeout),
359    }
360}