1use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8
9use anyhow::{Context, Result, anyhow};
10use async_trait::async_trait;
11use bytes::Bytes;
12use folk_api::Executor;
13use tokio::sync::{Semaphore, mpsc, oneshot, watch};
14use tokio::task::JoinHandle;
15use tracing::{debug, error, info, warn};
16
17use crate::config::WorkersConfig;
18use crate::runtime::{Runtime, WorkerHandle};
19use crate::worker_slot::SlotInfo;
20
21#[derive(Debug, thiserror::Error)]
23pub enum WorkError {
24 #[error("all workers busy")]
25 Busy,
26 #[error("worker died during request")]
27 WorkerDied,
28 #[error("execution timed out")]
29 Timeout,
30 #[error("worker returned application error: {message}")]
31 Application { code: i32, message: String },
32 #[error("internal error: {0}")]
33 Internal(String),
34}
35
36struct DispatchRequest {
38 request_id: u64,
39 method: String,
40 payload: serde_json::Value,
41 reply: oneshot::Sender<Result<serde_json::Value>>,
42}
43
44pub struct WorkerPool {
46 request_tx: mpsc::Sender<DispatchRequest>,
47 semaphore: Arc<Semaphore>,
48 next_request_id: AtomicU64,
50 runtime: Arc<dyn Runtime>,
51 reload_tx: watch::Sender<u64>,
54 _pool_task: JoinHandle<()>,
55}
56
57impl WorkerPool {
58 pub fn new(runtime: Arc<dyn Runtime>, config: WorkersConfig) -> Result<Arc<Self>> {
63 let semaphore = Arc::new(Semaphore::new(config.count));
64 let (request_tx, request_rx) = mpsc::channel::<DispatchRequest>(1024);
65 let (reload_tx, reload_rx) = watch::channel(0u64);
66
67 let pool_task = tokio::spawn(pool_main(
68 runtime.clone(),
69 config,
70 request_rx,
71 semaphore.clone(),
72 reload_rx,
73 ));
74
75 Ok(Arc::new(Self {
76 request_tx,
77 semaphore,
78 next_request_id: AtomicU64::new(1),
79 runtime,
80 reload_tx,
81 _pool_task: pool_task,
82 }))
83 }
84
85 pub async fn trigger_reload(&self) {
91 if let Err(e) = self.runtime.reload().await {
92 warn!(error = %e, "reload: cache invalidation failed; recycling anyway");
93 }
94 self.reload_tx.send_modify(|g| *g += 1);
95 let generation = *self.reload_tx.borrow();
96 info!(generation, "hot reload triggered; recycling workers");
97 }
98
99 async fn dispatch_value(
101 &self,
102 method: &str,
103 payload: serde_json::Value,
104 ) -> Result<serde_json::Value> {
105 let permit = self
106 .semaphore
107 .clone()
108 .acquire_owned()
109 .await
110 .context("pool semaphore closed")?;
111
112 let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
113 let (reply_tx, reply_rx) = oneshot::channel();
114 self.request_tx
115 .send(DispatchRequest {
116 request_id,
117 method: method.to_string(),
118 payload,
119 reply: reply_tx,
120 })
121 .await
122 .map_err(|_| anyhow!("pool task gone"))?;
123
124 let result = reply_rx
125 .await
126 .map_err(|_| anyhow!("pool dropped reply channel"))?;
127
128 drop(permit);
129 result
130 }
131}
132
133#[async_trait]
134impl Executor for WorkerPool {
135 async fn execute_method(&self, method: &str, payload: Bytes) -> Result<Bytes> {
136 debug!(
137 method,
138 payload_len = payload.len(),
139 "pool: execute_method called (bytes path)"
140 );
141 let value: serde_json::Value =
143 serde_json::from_slice(&payload).context("pool: failed to parse payload as JSON")?;
144 let result = self.dispatch_value(method, value).await?;
145 let bytes = serde_json::to_vec(&result).context("pool: failed to serialize response")?;
146 Ok(Bytes::from(bytes))
147 }
148
149 async fn execute_value(
150 &self,
151 method: &str,
152 payload: serde_json::Value,
153 ) -> Result<serde_json::Value> {
154 debug!(method, "pool: execute_value called (zero-copy path)");
155 self.dispatch_value(method, payload).await
156 }
157}
158
159async fn pool_main(
162 runtime: Arc<dyn Runtime>,
163 config: WorkersConfig,
164 mut request_rx: mpsc::Receiver<DispatchRequest>,
165 _semaphore: Arc<Semaphore>,
166 reload_rx: watch::Receiver<u64>,
167) {
168 let mut slot_inboxes: Vec<mpsc::Sender<DispatchRequest>> = Vec::with_capacity(config.count);
169 let mut slot_supervisors: Vec<JoinHandle<()>> = Vec::with_capacity(config.count);
170
171 for slot_id in 0..config.count {
172 let (slot_tx, slot_rx) = mpsc::channel::<DispatchRequest>(8);
173 slot_inboxes.push(slot_tx);
174 let runtime_clone = runtime.clone();
175 let cfg_clone = config.clone();
176 let reload_clone = reload_rx.clone();
177 let supervisor = tokio::spawn(slot_supervisor(
178 slot_id,
179 runtime_clone,
180 cfg_clone,
181 slot_rx,
182 reload_clone,
183 ));
184 slot_supervisors.push(supervisor);
185 }
186
187 let mut next: usize = 0;
189 while let Some(req) = request_rx.recv().await {
190 let chosen = next % slot_inboxes.len();
191 next = next.wrapping_add(1);
192
193 if slot_inboxes[chosen].send(req).await.is_err() {
194 warn!(slot_id = chosen, "slot inbox closed; failed to dispatch");
195 }
196 }
197
198 info!("pool main loop exiting; awaiting supervisors");
199 for handle in slot_supervisors {
200 let _ = handle.await;
201 }
202}
203
204async fn slot_supervisor(
207 slot_id: usize,
208 runtime: Arc<dyn Runtime>,
209 config: WorkersConfig,
210 mut inbox: mpsc::Receiver<DispatchRequest>,
211 mut reload_rx: watch::Receiver<u64>,
212) {
213 let mut slot = SlotInfo::new();
214 let mut worker: Option<Box<dyn WorkerHandle>> = None;
215 let mut boot_generation: u64 = *reload_rx.borrow();
218
219 loop {
220 if worker.is_none() {
222 boot_generation = *reload_rx.borrow();
223 match boot_worker(&runtime, &config, &mut slot).await {
224 Ok(w) => worker = Some(w),
225 Err(e) => {
226 error!(slot_id, error = ?e, "failed to boot worker, will retry");
227 tokio::time::sleep(Duration::from_secs(1)).await;
228 continue;
229 },
230 }
231 }
232
233 let recyclable = worker.as_ref().is_some_and(|w| w.is_recyclable());
234
235 let req = tokio::select! {
237 biased;
238 res = reload_rx.changed(), if recyclable => {
241 if res.is_err() {
242 info!(slot_id, "supervisor shutting down (reload channel closed)");
244 if let Some(mut w) = worker.take() {
245 let _ = w.terminate().await;
246 }
247 return;
248 }
249 if *reload_rx.borrow() > boot_generation {
250 info!(slot_id, "recycling idle worker for hot reload");
251 if let Some(mut w) = worker.take() {
252 let _ = w.terminate().await;
253 }
254 slot = SlotInfo::new();
255 }
256 continue;
257 },
258 maybe_req = inbox.recv() => {
259 let Some(req) = maybe_req else {
260 info!(slot_id, "supervisor shutting down (inbox closed)");
261 if let Some(mut w) = worker.take() {
262 if let Err(e) = w.terminate().await {
263 warn!(slot_id, error = ?e, "terminate error during shutdown");
264 }
265 }
266 return;
267 };
268 req
269 },
270 };
271
272 let Some(w) = worker.as_mut() else {
274 unreachable!()
275 };
276 slot.mark_busy();
277 let result = dispatch_one(
278 w.as_mut(),
279 &req.method,
280 req.payload,
281 req.request_id,
282 config.exec_timeout,
283 )
284 .await;
285 slot.mark_idle();
286
287 let _ = req.reply.send(result.map_err(anyhow::Error::from));
289
290 let reload_pending = *reload_rx.borrow() > boot_generation;
293 if reload_pending || slot.should_recycle(&config) {
294 if let Some(ref w) = worker {
295 if !w.is_recyclable() {
296 debug!(slot_id, "skipping recycle for non-recyclable worker");
297 continue;
298 }
299 }
300 let reason = if reload_pending {
301 "hot reload"
302 } else {
303 "lifecycle"
304 };
305 info!(
306 slot_id,
307 jobs = slot.jobs_handled,
308 reason,
309 "recycling worker"
310 );
311 if let Some(mut w) = worker.take() {
312 let _ = w.terminate().await;
313 }
314 slot = SlotInfo::new();
315 }
316 }
317}
318
319async fn boot_worker(
320 runtime: &Arc<dyn Runtime>,
321 config: &WorkersConfig,
322 slot: &mut SlotInfo,
323) -> Result<Box<dyn WorkerHandle>> {
324 debug!("boot_worker: spawning");
325 let mut handle = runtime.spawn().await.context("spawn")?;
326 debug!(id = handle.id(), "boot_worker: waiting for ready");
327
328 let timeout = tokio::time::timeout(config.boot_timeout, handle.ready());
329 match timeout.await {
330 Ok(Ok(())) => {
331 let id = handle.id();
332 slot.mark_ready(id);
333 debug!(id, "worker ready");
334 Ok(handle)
335 },
336 Ok(Err(e)) => {
337 let _ = handle.terminate().await;
338 Err(e).context("worker ready() failed during boot")
339 },
340 Err(_) => {
341 let _ = handle.terminate().await;
342 anyhow::bail!("worker boot timed out after {:?}", config.boot_timeout)
343 },
344 }
345}
346
347async fn dispatch_one(
348 worker: &mut dyn WorkerHandle,
349 method: &str,
350 payload: serde_json::Value,
351 request_id: u64,
352 exec_timeout: Duration,
353) -> Result<serde_json::Value, WorkError> {
354 let recv = tokio::time::timeout(exec_timeout, worker.execute(method, payload, request_id));
355 match recv.await {
356 Ok(Ok(result)) => Ok(result),
357 Ok(Err(e)) => Err(WorkError::Internal(e.to_string())),
358 Err(_) => Err(WorkError::Timeout),
359 }
360}