1use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{Context, Result, anyhow};
9use async_trait::async_trait;
10use bytes::Bytes;
11use folk_api::Executor;
12use tokio::sync::{Semaphore, mpsc, oneshot};
13use tokio::task::JoinHandle;
14use tracing::{debug, error, info, warn};
15
16use crate::config::WorkersConfig;
17use crate::runtime::{Runtime, WorkerHandle};
18use crate::worker_slot::SlotInfo;
19
20#[derive(Debug, thiserror::Error)]
22pub enum WorkError {
23 #[error("all workers busy")]
24 Busy,
25 #[error("worker died during request")]
26 WorkerDied,
27 #[error("execution timed out")]
28 Timeout,
29 #[error("worker returned application error: {message}")]
30 Application { code: i32, message: String },
31 #[error("internal error: {0}")]
32 Internal(String),
33}
34
35struct DispatchRequest {
37 method: String,
38 payload: serde_json::Value,
39 reply: oneshot::Sender<Result<serde_json::Value>>,
40}
41
42pub struct WorkerPool {
44 request_tx: mpsc::Sender<DispatchRequest>,
45 semaphore: Arc<Semaphore>,
46 _pool_task: JoinHandle<()>,
47}
48
49impl WorkerPool {
50 pub fn new(runtime: Arc<dyn Runtime>, config: WorkersConfig) -> Result<Arc<Self>> {
55 let semaphore = Arc::new(Semaphore::new(config.count));
56 let (request_tx, request_rx) = mpsc::channel::<DispatchRequest>(1024);
57
58 let pool_task = tokio::spawn(pool_main(runtime, config, request_rx, semaphore.clone()));
59
60 Ok(Arc::new(Self {
61 request_tx,
62 semaphore,
63 _pool_task: pool_task,
64 }))
65 }
66
67 async fn dispatch_value(
69 &self,
70 method: &str,
71 payload: serde_json::Value,
72 ) -> Result<serde_json::Value> {
73 let permit = self
74 .semaphore
75 .clone()
76 .acquire_owned()
77 .await
78 .context("pool semaphore closed")?;
79
80 let (reply_tx, reply_rx) = oneshot::channel();
81 self.request_tx
82 .send(DispatchRequest {
83 method: method.to_string(),
84 payload,
85 reply: reply_tx,
86 })
87 .await
88 .map_err(|_| anyhow!("pool task gone"))?;
89
90 let result = reply_rx
91 .await
92 .map_err(|_| anyhow!("pool dropped reply channel"))?;
93
94 drop(permit);
95 result
96 }
97}
98
99#[async_trait]
100impl Executor for WorkerPool {
101 async fn execute_method(&self, method: &str, payload: Bytes) -> Result<Bytes> {
102 debug!(
103 method,
104 payload_len = payload.len(),
105 "pool: execute_method called (bytes path)"
106 );
107 let value: serde_json::Value =
109 serde_json::from_slice(&payload).context("pool: failed to parse payload as JSON")?;
110 let result = self.dispatch_value(method, value).await?;
111 let bytes = serde_json::to_vec(&result).context("pool: failed to serialize response")?;
112 Ok(Bytes::from(bytes))
113 }
114
115 async fn execute_value(
116 &self,
117 method: &str,
118 payload: serde_json::Value,
119 ) -> Result<serde_json::Value> {
120 debug!(method, "pool: execute_value called (zero-copy path)");
121 self.dispatch_value(method, payload).await
122 }
123}
124
125async fn pool_main(
128 runtime: Arc<dyn Runtime>,
129 config: WorkersConfig,
130 mut request_rx: mpsc::Receiver<DispatchRequest>,
131 _semaphore: Arc<Semaphore>,
132) {
133 let mut slot_inboxes: Vec<mpsc::Sender<DispatchRequest>> = Vec::with_capacity(config.count);
134 let mut slot_supervisors: Vec<JoinHandle<()>> = Vec::with_capacity(config.count);
135
136 for slot_id in 0..config.count {
137 let (slot_tx, slot_rx) = mpsc::channel::<DispatchRequest>(8);
138 slot_inboxes.push(slot_tx);
139 let runtime_clone = runtime.clone();
140 let cfg_clone = config.clone();
141 let supervisor = tokio::spawn(slot_supervisor(slot_id, runtime_clone, cfg_clone, slot_rx));
142 slot_supervisors.push(supervisor);
143 }
144
145 let mut next: usize = 0;
147 while let Some(req) = request_rx.recv().await {
148 let chosen = next % slot_inboxes.len();
149 next = next.wrapping_add(1);
150
151 if slot_inboxes[chosen].send(req).await.is_err() {
152 warn!(slot_id = chosen, "slot inbox closed; failed to dispatch");
153 }
154 }
155
156 info!("pool main loop exiting; awaiting supervisors");
157 for handle in slot_supervisors {
158 let _ = handle.await;
159 }
160}
161
162async fn slot_supervisor(
165 slot_id: usize,
166 runtime: Arc<dyn Runtime>,
167 config: WorkersConfig,
168 mut inbox: mpsc::Receiver<DispatchRequest>,
169) {
170 let mut slot = SlotInfo::new();
171 let mut worker: Option<Box<dyn WorkerHandle>> = None;
172
173 loop {
174 if worker.is_none() {
176 match boot_worker(&runtime, &config, &mut slot).await {
177 Ok(w) => worker = Some(w),
178 Err(e) => {
179 error!(slot_id, error = ?e, "failed to boot worker, will retry");
180 tokio::time::sleep(Duration::from_secs(1)).await;
181 continue;
182 },
183 }
184 }
185
186 let Some(w) = worker.as_mut() else {
187 unreachable!()
188 };
189
190 let Some(req) = inbox.recv().await else {
192 info!(slot_id, "supervisor shutting down (inbox closed)");
193 if let Err(e) = w.terminate().await {
194 warn!(slot_id, error = ?e, "terminate error during shutdown");
195 }
196 return;
197 };
198
199 slot.mark_busy();
201 let result = dispatch_one(w.as_mut(), &req.method, req.payload, config.exec_timeout).await;
202 slot.mark_idle();
203
204 let _ = req.reply.send(result.map_err(anyhow::Error::from));
206
207 if slot.should_recycle(&config) {
209 info!(slot_id, jobs = slot.jobs_handled, "recycling worker");
210 if let Some(mut w) = worker.take() {
211 let _ = w.terminate().await;
212 }
213 slot = SlotInfo::new();
214 }
215 }
216}
217
218async fn boot_worker(
219 runtime: &Arc<dyn Runtime>,
220 config: &WorkersConfig,
221 slot: &mut SlotInfo,
222) -> Result<Box<dyn WorkerHandle>> {
223 debug!("boot_worker: spawning");
224 let mut handle = runtime.spawn().await.context("spawn")?;
225 debug!(id = handle.id(), "boot_worker: waiting for ready");
226
227 let timeout = tokio::time::timeout(config.boot_timeout, handle.ready());
228 match timeout.await {
229 Ok(Ok(())) => {
230 let id = handle.id();
231 slot.mark_ready(id);
232 debug!(id, "worker ready");
233 Ok(handle)
234 },
235 Ok(Err(e)) => {
236 let _ = handle.terminate().await;
237 Err(e).context("worker ready() failed during boot")
238 },
239 Err(_) => {
240 let _ = handle.terminate().await;
241 anyhow::bail!("worker boot timed out after {:?}", config.boot_timeout)
242 },
243 }
244}
245
246async fn dispatch_one(
247 worker: &mut dyn WorkerHandle,
248 method: &str,
249 payload: serde_json::Value,
250 exec_timeout: Duration,
251) -> Result<serde_json::Value, WorkError> {
252 let recv = tokio::time::timeout(exec_timeout, worker.execute(method, payload));
253 match recv.await {
254 Ok(Ok(result)) => Ok(result),
255 Ok(Err(e)) => Err(WorkError::Internal(e.to_string())),
256 Err(_) => Err(WorkError::Timeout),
257 }
258}