Skip to main content

folk_core/
worker_pool.rs

1//! Worker pool: dispatches requests to PHP workers, manages slot lifecycle.
2//!
3//! See `folk-spec/spec/03-worker-lifecycle.md` for the design.
4
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{Context, Result, anyhow};
9use async_trait::async_trait;
10use bytes::Bytes;
11use folk_api::Executor;
12use tokio::sync::{Semaphore, mpsc, oneshot};
13use tokio::task::JoinHandle;
14use tracing::{debug, error, info, warn};
15
16use crate::config::WorkersConfig;
17use crate::runtime::{Runtime, WorkerHandle};
18use crate::worker_slot::SlotInfo;
19
20/// Pool errors. Plugins typically translate these into their own domain errors.
21#[derive(Debug, thiserror::Error)]
22pub enum WorkError {
23    #[error("all workers busy")]
24    Busy,
25    #[error("worker died during request")]
26    WorkerDied,
27    #[error("execution timed out")]
28    Timeout,
29    #[error("worker returned application error: {message}")]
30    Application { code: i32, message: String },
31    #[error("internal error: {0}")]
32    Internal(String),
33}
34
35/// One dispatch request: method name, payload + reply channel.
36struct DispatchRequest {
37    method: String,
38    payload: serde_json::Value,
39    reply: oneshot::Sender<Result<serde_json::Value>>,
40}
41
42/// Worker pool — the dispatch surface.
43pub struct WorkerPool {
44    request_tx: mpsc::Sender<DispatchRequest>,
45    semaphore: Arc<Semaphore>,
46    _pool_task: JoinHandle<()>,
47}
48
49impl WorkerPool {
50    /// Construct a pool with `config.count` workers spawned via `runtime`.
51    ///
52    /// Returns once the pool task is started. Workers boot asynchronously
53    /// in the background.
54    pub fn new(runtime: Arc<dyn Runtime>, config: WorkersConfig) -> Result<Arc<Self>> {
55        let semaphore = Arc::new(Semaphore::new(config.count));
56        let (request_tx, request_rx) = mpsc::channel::<DispatchRequest>(1024);
57
58        let pool_task = tokio::spawn(pool_main(runtime, config, request_rx, semaphore.clone()));
59
60        Ok(Arc::new(Self {
61            request_tx,
62            semaphore,
63            _pool_task: pool_task,
64        }))
65    }
66
67    /// Dispatch a Value-based request through the pool.
68    async fn dispatch_value(
69        &self,
70        method: &str,
71        payload: serde_json::Value,
72    ) -> Result<serde_json::Value> {
73        let permit = self
74            .semaphore
75            .clone()
76            .acquire_owned()
77            .await
78            .context("pool semaphore closed")?;
79
80        let (reply_tx, reply_rx) = oneshot::channel();
81        self.request_tx
82            .send(DispatchRequest {
83                method: method.to_string(),
84                payload,
85                reply: reply_tx,
86            })
87            .await
88            .map_err(|_| anyhow!("pool task gone"))?;
89
90        let result = reply_rx
91            .await
92            .map_err(|_| anyhow!("pool dropped reply channel"))?;
93
94        drop(permit);
95        result
96    }
97}
98
99#[async_trait]
100impl Executor for WorkerPool {
101    async fn execute_method(&self, method: &str, payload: Bytes) -> Result<Bytes> {
102        debug!(
103            method,
104            payload_len = payload.len(),
105            "pool: execute_method called (bytes path)"
106        );
107        // Legacy path: parse JSON bytes → Value → dispatch → Value → serialize
108        let value: serde_json::Value =
109            serde_json::from_slice(&payload).context("pool: failed to parse payload as JSON")?;
110        let result = self.dispatch_value(method, value).await?;
111        let bytes = serde_json::to_vec(&result).context("pool: failed to serialize response")?;
112        Ok(Bytes::from(bytes))
113    }
114
115    async fn execute_value(
116        &self,
117        method: &str,
118        payload: serde_json::Value,
119    ) -> Result<serde_json::Value> {
120        debug!(method, "pool: execute_value called (zero-copy path)");
121        self.dispatch_value(method, payload).await
122    }
123}
124
125// ---- Pool task ---------------------------------------------------------------
126
127async fn pool_main(
128    runtime: Arc<dyn Runtime>,
129    config: WorkersConfig,
130    mut request_rx: mpsc::Receiver<DispatchRequest>,
131    _semaphore: Arc<Semaphore>,
132) {
133    let mut slot_inboxes: Vec<mpsc::Sender<DispatchRequest>> = Vec::with_capacity(config.count);
134    let mut slot_supervisors: Vec<JoinHandle<()>> = Vec::with_capacity(config.count);
135
136    for slot_id in 0..config.count {
137        let (slot_tx, slot_rx) = mpsc::channel::<DispatchRequest>(8);
138        slot_inboxes.push(slot_tx);
139        let runtime_clone = runtime.clone();
140        let cfg_clone = config.clone();
141        let supervisor = tokio::spawn(slot_supervisor(slot_id, runtime_clone, cfg_clone, slot_rx));
142        slot_supervisors.push(supervisor);
143    }
144
145    // Round-robin dispatch.
146    let mut next: usize = 0;
147    while let Some(req) = request_rx.recv().await {
148        let chosen = next % slot_inboxes.len();
149        next = next.wrapping_add(1);
150
151        if slot_inboxes[chosen].send(req).await.is_err() {
152            warn!(slot_id = chosen, "slot inbox closed; failed to dispatch");
153        }
154    }
155
156    info!("pool main loop exiting; awaiting supervisors");
157    for handle in slot_supervisors {
158        let _ = handle.await;
159    }
160}
161
162// ---- Slot supervisor ---------------------------------------------------------
163
164async fn slot_supervisor(
165    slot_id: usize,
166    runtime: Arc<dyn Runtime>,
167    config: WorkersConfig,
168    mut inbox: mpsc::Receiver<DispatchRequest>,
169) {
170    let mut slot = SlotInfo::new();
171    let mut worker: Option<Box<dyn WorkerHandle>> = None;
172
173    loop {
174        // Spawn a worker if we don't have one.
175        if worker.is_none() {
176            match boot_worker(&runtime, &config, &mut slot).await {
177                Ok(w) => worker = Some(w),
178                Err(e) => {
179                    error!(slot_id, error = ?e, "failed to boot worker, will retry");
180                    tokio::time::sleep(Duration::from_secs(1)).await;
181                    continue;
182                },
183            }
184        }
185
186        let Some(w) = worker.as_mut() else {
187            unreachable!()
188        };
189
190        // Wait for a request or for shutdown.
191        let Some(req) = inbox.recv().await else {
192            info!(slot_id, "supervisor shutting down (inbox closed)");
193            if let Err(e) = w.terminate().await {
194                warn!(slot_id, error = ?e, "terminate error during shutdown");
195            }
196            return;
197        };
198
199        // Dispatch.
200        slot.mark_busy();
201        let result = dispatch_one(w.as_mut(), &req.method, req.payload, config.exec_timeout).await;
202        slot.mark_idle();
203
204        // Send reply.
205        let _ = req.reply.send(result.map_err(anyhow::Error::from));
206
207        // Recycle?
208        if slot.should_recycle(&config) {
209            info!(slot_id, jobs = slot.jobs_handled, "recycling worker");
210            if let Some(mut w) = worker.take() {
211                let _ = w.terminate().await;
212            }
213            slot = SlotInfo::new();
214        }
215    }
216}
217
218async fn boot_worker(
219    runtime: &Arc<dyn Runtime>,
220    config: &WorkersConfig,
221    slot: &mut SlotInfo,
222) -> Result<Box<dyn WorkerHandle>> {
223    debug!("boot_worker: spawning");
224    let mut handle = runtime.spawn().await.context("spawn")?;
225    debug!(id = handle.id(), "boot_worker: waiting for ready");
226
227    let timeout = tokio::time::timeout(config.boot_timeout, handle.ready());
228    match timeout.await {
229        Ok(Ok(())) => {
230            let id = handle.id();
231            slot.mark_ready(id);
232            debug!(id, "worker ready");
233            Ok(handle)
234        },
235        Ok(Err(e)) => {
236            let _ = handle.terminate().await;
237            Err(e).context("worker ready() failed during boot")
238        },
239        Err(_) => {
240            let _ = handle.terminate().await;
241            anyhow::bail!("worker boot timed out after {:?}", config.boot_timeout)
242        },
243    }
244}
245
246async fn dispatch_one(
247    worker: &mut dyn WorkerHandle,
248    method: &str,
249    payload: serde_json::Value,
250    exec_timeout: Duration,
251) -> Result<serde_json::Value, WorkError> {
252    let recv = tokio::time::timeout(exec_timeout, worker.execute(method, payload));
253    match recv.await {
254        Ok(Ok(result)) => Ok(result),
255        Ok(Err(e)) => Err(WorkError::Internal(e.to_string())),
256        Err(_) => Err(WorkError::Timeout),
257    }
258}