folk-core 0.3.3

Server core for Folk PHP application server — worker pool, plugin registry
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! Worker pool: dispatches requests to PHP workers, manages slot lifecycle.
//!
//! See `folk-spec/spec/03-worker-lifecycle.md` for the design.

use std::sync::Arc;
use std::time::Duration;

use anyhow::{Context, Result, anyhow};
use async_trait::async_trait;
use bytes::Bytes;
use folk_api::Executor;
use tokio::sync::{Semaphore, mpsc, oneshot, watch};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};

use crate::config::WorkersConfig;
use crate::runtime::{Runtime, WorkerHandle};
use crate::worker_slot::SlotInfo;

/// Pool errors. Plugins typically translate these into their own domain errors.
#[derive(Debug, thiserror::Error)]
pub enum WorkError {
    #[error("all workers busy")]
    Busy,
    #[error("worker died during request")]
    WorkerDied,
    #[error("execution timed out")]
    Timeout,
    #[error("worker returned application error: {message}")]
    Application { code: i32, message: String },
    #[error("internal error: {0}")]
    Internal(String),
}

/// One dispatch request: id, method name, payload + reply channel.
struct DispatchRequest {
    request_id: Arc<str>,
    method: String,
    payload: serde_json::Value,
    reply: oneshot::Sender<Result<serde_json::Value>>,
}

/// Worker pool — the dispatch surface.
pub struct WorkerPool {
    request_tx: mpsc::Sender<DispatchRequest>,
    semaphore: Arc<Semaphore>,
    runtime: Arc<dyn Runtime>,
    /// Monotonic reload generation. Bumped by `trigger_reload`; observed by
    /// slot supervisors to recycle their workers after the current request.
    reload_tx: watch::Sender<u64>,
    _pool_task: JoinHandle<()>,
}

impl WorkerPool {
    /// Construct a pool with `config.count` workers spawned via `runtime`.
    ///
    /// Returns once the pool task is started. Workers boot asynchronously
    /// in the background.
    pub fn new(runtime: Arc<dyn Runtime>, config: WorkersConfig) -> Result<Arc<Self>> {
        let semaphore = Arc::new(Semaphore::new(config.count));
        let (request_tx, request_rx) = mpsc::channel::<DispatchRequest>(1024);
        let (reload_tx, reload_rx) = watch::channel(0u64);

        let pool_task = tokio::spawn(pool_main(
            runtime.clone(),
            config,
            request_rx,
            semaphore.clone(),
            reload_rx,
        ));

        Ok(Arc::new(Self {
            request_tx,
            semaphore,
            runtime,
            reload_tx,
            _pool_task: pool_task,
        }))
    }

    /// Trigger a hot reload: invalidate compiled-code caches, then signal all
    /// recyclable workers to restart after their current request completes.
    ///
    /// Non-recyclable workers (the main PHP thread) keep running — see the
    /// dev-mode docs for the implications.
    pub async fn trigger_reload(&self) {
        if let Err(e) = self.runtime.reload().await {
            warn!(error = %e, "reload: cache invalidation failed; recycling anyway");
        }
        self.reload_tx.send_modify(|g| *g += 1);
        let generation = *self.reload_tx.borrow();
        info!(generation, "hot reload triggered; recycling workers");
    }

    /// Dispatch a Value-based request through the pool.
    ///
    /// Returns the response together with the `request_id` (a UUID v7) generated
    /// for this request — the same id exposed to PHP via `folk_request_id()`.
    async fn dispatch_value(
        &self,
        method: &str,
        payload: serde_json::Value,
    ) -> Result<(serde_json::Value, Arc<str>)> {
        let permit = self
            .semaphore
            .clone()
            .acquire_owned()
            .await
            .context("pool semaphore closed")?;

        // UUID v7: time-ordered (sortable by creation time) and globally unique
        // across instances and restarts — usable as a single correlation key in
        // aggregated logs.
        let request_id: Arc<str> = Arc::from(uuid::Uuid::now_v7().hyphenated().to_string());
        let (reply_tx, reply_rx) = oneshot::channel();
        self.request_tx
            .send(DispatchRequest {
                request_id: request_id.clone(),
                method: method.to_string(),
                payload,
                reply: reply_tx,
            })
            .await
            .map_err(|_| anyhow!("pool task gone"))?;

        let result = reply_rx
            .await
            .map_err(|_| anyhow!("pool dropped reply channel"))?;

        drop(permit);
        result.map(|value| (value, request_id))
    }
}

#[async_trait]
impl Executor for WorkerPool {
    async fn execute_method(&self, method: &str, payload: Bytes) -> Result<Bytes> {
        debug!(
            method,
            payload_len = payload.len(),
            "pool: execute_method called (bytes path)"
        );
        // Legacy path: parse JSON bytes → Value → dispatch → Value → serialize
        let value: serde_json::Value =
            serde_json::from_slice(&payload).context("pool: failed to parse payload as JSON")?;
        let (result, _id) = self.dispatch_value(method, value).await?;
        let bytes = serde_json::to_vec(&result).context("pool: failed to serialize response")?;
        Ok(Bytes::from(bytes))
    }

    async fn execute_value(
        &self,
        method: &str,
        payload: serde_json::Value,
    ) -> Result<serde_json::Value> {
        debug!(method, "pool: execute_value called (zero-copy path)");
        let (value, _id) = self.dispatch_value(method, payload).await?;
        Ok(value)
    }

    async fn execute_value_traced(
        &self,
        method: &str,
        payload: serde_json::Value,
    ) -> Result<(serde_json::Value, Arc<str>)> {
        debug!(method, "pool: execute_value_traced called");
        self.dispatch_value(method, payload).await
    }
}

// ---- Pool task ---------------------------------------------------------------

async fn pool_main(
    runtime: Arc<dyn Runtime>,
    config: WorkersConfig,
    mut request_rx: mpsc::Receiver<DispatchRequest>,
    _semaphore: Arc<Semaphore>,
    reload_rx: watch::Receiver<u64>,
) {
    let mut slot_inboxes: Vec<mpsc::Sender<DispatchRequest>> = Vec::with_capacity(config.count);
    let mut slot_supervisors: Vec<JoinHandle<()>> = Vec::with_capacity(config.count);

    for slot_id in 0..config.count {
        let (slot_tx, slot_rx) = mpsc::channel::<DispatchRequest>(8);
        slot_inboxes.push(slot_tx);
        let runtime_clone = runtime.clone();
        let cfg_clone = config.clone();
        let reload_clone = reload_rx.clone();
        let supervisor = tokio::spawn(slot_supervisor(
            slot_id,
            runtime_clone,
            cfg_clone,
            slot_rx,
            reload_clone,
        ));
        slot_supervisors.push(supervisor);
    }

    // Round-robin dispatch: on SendError the slot's inbox is permanently closed
    // (its supervisor task exited/panicked). Mark the slot dead, recover the
    // request value from the error and try the next live slot.
    let n = slot_inboxes.len();
    let mut next: usize = 0;
    let mut dead: Vec<bool> = vec![false; n];

    while let Some(initial_req) = request_rx.recv().await {
        // Wrap in Option so we can move into send() and recover ownership on
        // failure — the Rust borrow checker cannot track that `req = e.0`
        // in the Err arm restores ownership through the loop.
        let mut req: Option<DispatchRequest> = Some(initial_req);
        let mut sent = false;

        for attempt in 0..n {
            let chosen = (next.wrapping_add(attempt)) % n;
            if dead[chosen] {
                continue;
            }
            let r = req.take().expect("req must be Some when slot is alive");
            match slot_inboxes[chosen].send(r).await {
                Ok(()) => {
                    next = chosen.wrapping_add(1);
                    sent = true;
                    break;
                },
                Err(e) => {
                    warn!(slot_id = chosen, "slot inbox closed; skipping dead slot");
                    dead[chosen] = true;
                    req = Some(e.0);
                },
            }
        }

        if !sent {
            error!("all worker slot inboxes closed; cannot dispatch request");
            if let Some(r) = req {
                let _ = r.reply.send(Err(anyhow::anyhow!("all worker slots dead")));
            }
        }
    }

    info!("pool main loop exiting; awaiting supervisors");
    for handle in slot_supervisors {
        let _ = handle.await;
    }
}

// ---- Slot supervisor ---------------------------------------------------------

async fn slot_supervisor(
    slot_id: usize,
    runtime: Arc<dyn Runtime>,
    config: WorkersConfig,
    mut inbox: mpsc::Receiver<DispatchRequest>,
    mut reload_rx: watch::Receiver<u64>,
) {
    let mut slot = SlotInfo::new();
    let mut worker: Option<Box<dyn WorkerHandle>> = None;
    // Reload generation this worker was booted at. When the pool's generation
    // advances past this, the worker must be recycled to pick up new code.
    let mut boot_generation: u64 = *reload_rx.borrow();

    loop {
        // Spawn a worker if we don't have one.
        if worker.is_none() {
            boot_generation = *reload_rx.borrow();
            match boot_worker(&runtime, &config, &mut slot).await {
                Ok(w) => worker = Some(w),
                Err(e) => {
                    error!(slot_id, error = ?e, "failed to boot worker, will retry");
                    tokio::time::sleep(Duration::from_secs(1)).await;
                    continue;
                },
            }
        }

        let recyclable = worker.as_ref().is_some_and(|w| w.is_recyclable());

        // Wait for a request, a reload signal, or shutdown.
        let req = tokio::select! {
            biased;
            // React to a reload while idle so workers restart promptly even
            // without traffic. Skip for non-recyclable workers (main thread).
            res = reload_rx.changed(), if recyclable => {
                if res.is_err() {
                    // Pool dropped the sender — treat as shutdown.
                    info!(slot_id, "supervisor shutting down (reload channel closed)");
                    if let Some(mut w) = worker.take() {
                        let _ = w.terminate().await;
                    }
                    return;
                }
                if *reload_rx.borrow() > boot_generation {
                    info!(slot_id, "recycling idle worker for hot reload");
                    if let Some(mut w) = worker.take() {
                        let _ = w.terminate().await;
                    }
                    slot = SlotInfo::new();
                }
                continue;
            },
            maybe_req = inbox.recv() => {
                let Some(req) = maybe_req else {
                    info!(slot_id, "supervisor shutting down (inbox closed)");
                    if let Some(mut w) = worker.take() {
                        if let Err(e) = w.terminate().await {
                            warn!(slot_id, error = ?e, "terminate error during shutdown");
                        }
                    }
                    return;
                };
                req
            },
        };

        // Dispatch.
        let Some(w) = worker.as_mut() else {
            unreachable!()
        };
        slot.mark_busy();
        let result = dispatch_one(
            w.as_mut(),
            &req.method,
            req.payload,
            req.request_id.clone(),
            config.exec_timeout,
        )
        .await;
        slot.mark_idle();

        // Send reply.
        let _ = req.reply.send(result.map_err(anyhow::Error::from));

        // Recycle on reload (after the request completes) or per the lifecycle
        // policies (max_jobs / ttl).
        let reload_pending = *reload_rx.borrow() > boot_generation;
        if reload_pending || slot.should_recycle(&config) {
            if let Some(ref w) = worker {
                if !w.is_recyclable() {
                    debug!(slot_id, "skipping recycle for non-recyclable worker");
                    continue;
                }
            }
            let reason = if reload_pending {
                "hot reload"
            } else {
                "lifecycle"
            };
            info!(
                slot_id,
                jobs = slot.jobs_handled,
                reason,
                "recycling worker"
            );
            if let Some(mut w) = worker.take() {
                let _ = w.terminate().await;
            }
            slot = SlotInfo::new();
        }
    }
}

async fn boot_worker(
    runtime: &Arc<dyn Runtime>,
    config: &WorkersConfig,
    slot: &mut SlotInfo,
) -> Result<Box<dyn WorkerHandle>> {
    debug!("boot_worker: spawning");
    let mut handle = runtime.spawn().await.context("spawn")?;
    debug!(id = handle.id(), "boot_worker: waiting for ready");

    let timeout = tokio::time::timeout(config.boot_timeout, handle.ready());
    match timeout.await {
        Ok(Ok(())) => {
            let id = handle.id();
            slot.mark_ready(id);
            debug!(id, "worker ready");
            Ok(handle)
        },
        Ok(Err(e)) => {
            let _ = handle.terminate().await;
            Err(e).context("worker ready() failed during boot")
        },
        Err(_) => {
            let _ = handle.terminate().await;
            anyhow::bail!("worker boot timed out after {:?}", config.boot_timeout)
        },
    }
}

async fn dispatch_one(
    worker: &mut dyn WorkerHandle,
    method: &str,
    payload: serde_json::Value,
    request_id: Arc<str>,
    exec_timeout: Duration,
) -> Result<serde_json::Value, WorkError> {
    let recv = tokio::time::timeout(exec_timeout, worker.execute(method, payload, request_id));
    match recv.await {
        Ok(Ok(result)) => Ok(result),
        Ok(Err(e)) => Err(WorkError::Internal(e.to_string())),
        Err(_) => Err(WorkError::Timeout),
    }
}