1use crate::{
2 error::{FaucetError, FaucetResult},
3 leak,
4 networking::get_available_socket,
5 server::{
6 logging::{parse_faucet_event, FaucetEventResult},
7 FaucetServerConfig,
8 },
9 shutdown::ShutdownSignal,
10 telemetry::send_log_event,
11};
12use std::{
13 ffi::OsStr,
14 net::SocketAddr,
15 path::Path,
16 sync::atomic::{AtomicBool, Ordering},
17 time::Duration,
18};
19use tokio::{
20 process::Child,
21 sync::{Mutex, Notify},
22 task::JoinHandle,
23};
24use tokio_stream::StreamExt;
25use tokio_util::codec::{FramedRead, LinesCodec};
26
27#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, serde::Deserialize)]
28pub enum WorkerType {
29 #[serde(alias = "plumber", alias = "Plumber")]
30 Plumber,
31 #[serde(alias = "shiny", alias = "Shiny")]
32 Shiny,
33 #[serde(alias = "quarto-shiny", alias = "QuartoShiny", alias = "quarto_shiny")]
34 QuartoShiny,
35 #[cfg(test)]
36 Dummy,
37}
38
39fn log_stdio(mut child: Child, target: &'static str) -> FaucetResult<Child> {
40 let pid = child.id().expect("Failed to get plumber worker PID");
41
42 let mut stdout = FramedRead::new(
43 child.stdout.take().ok_or(FaucetError::Unknown(format!(
44 "Unable to take stdout from PID {pid}"
45 )))?,
46 LinesCodec::new(),
47 );
48
49 let mut stderr = FramedRead::new(
50 child.stderr.take().ok_or(FaucetError::Unknown(format!(
51 "Unable to take stderr from PID {pid}"
52 )))?,
53 LinesCodec::new(),
54 );
55
56 tokio::spawn(async move {
57 while let Some(line) = stderr.next().await {
58 if let Ok(line) = line {
59 match parse_faucet_event(&line) {
60 FaucetEventResult::Output(line) => log::warn!(target: target, "{line}"),
61 FaucetEventResult::Event(e) => {
62 send_log_event(e);
63 }
64 FaucetEventResult::EventError(e) => {
65 log::error!(target: target, "{e:?}")
66 }
67 }
68 }
69 }
70 });
71
72 tokio::spawn(async move {
73 while let Some(line) = stdout.next().await {
74 if let Ok(line) = line {
75 log::info!(target: target, "{line}");
76 }
77 }
78 });
79
80 Ok(child)
81}
82
83#[derive(Copy, Clone)]
84pub struct WorkerConfig {
85 pub wtype: WorkerType,
86 pub app_dir: Option<&'static str>,
87 pub rscript: &'static OsStr,
88 pub quarto: &'static OsStr,
89 pub workdir: &'static Path,
90 pub addr: SocketAddr,
91 pub target: &'static str,
92 pub worker_id: usize,
93 pub worker_route: Option<&'static str>,
94 pub is_online: &'static AtomicBool,
95 pub qmd: Option<&'static Path>,
96 pub handle: &'static Mutex<Option<JoinHandle<FaucetResult<()>>>>,
97 pub shutdown: &'static ShutdownSignal,
98 pub idle_stop: &'static Notify,
99}
100
101impl WorkerConfig {
102 fn new(
103 worker_id: usize,
104 addr: SocketAddr,
105 server_config: &FaucetServerConfig,
106 shutdown: &'static ShutdownSignal,
107 ) -> Self {
108 Self {
109 addr,
110 worker_id,
111 is_online: leak!(AtomicBool::new(false)),
112 workdir: server_config.workdir,
113 worker_route: server_config.route,
114 target: leak!(format!("Worker::{}", worker_id)),
115 app_dir: server_config.app_dir,
116 wtype: server_config.server_type,
117 rscript: server_config.rscript,
118 quarto: server_config.quarto,
119 qmd: server_config.qmd,
120 handle: leak!(Mutex::new(None)),
121 shutdown,
122 idle_stop: leak!(Notify::new()),
123 }
124 }
125 #[allow(dead_code)]
126 #[cfg(test)]
127 pub fn dummy(target: &'static str, addr: &str, online: bool) -> WorkerConfig {
128 WorkerConfig {
129 target,
130 is_online: leak!(AtomicBool::new(online)),
131 addr: addr.parse().unwrap(),
132 app_dir: None,
133 worker_route: None,
134 rscript: OsStr::new(""),
135 wtype: WorkerType::Dummy,
136 worker_id: 1,
137 quarto: OsStr::new(""),
138 workdir: Path::new("."),
139 qmd: None,
140 handle: leak!(Mutex::new(None)),
141 shutdown: leak!(ShutdownSignal::new()),
142 idle_stop: leak!(Notify::new()),
143 }
144 }
145}
146
147fn spawn_child_rscript_process(
148 config: &WorkerConfig,
149 command: impl AsRef<str>,
150) -> FaucetResult<Child> {
151 let mut cmd = tokio::process::Command::new(config.rscript);
152
153 cmd.current_dir(config.workdir)
155 .arg("-e")
156 .arg(command.as_ref())
157 .stdin(std::process::Stdio::null())
158 .stdout(std::process::Stdio::piped())
159 .stderr(std::process::Stdio::piped())
160 .env("FAUCET_WORKER_ID", config.worker_id.to_string())
161 .kill_on_drop(true);
163
164 #[cfg(unix)]
165 unsafe {
166 cmd.pre_exec(|| {
167 nix::libc::setpgid(0, 0);
169 Ok(())
170 });
171 }
172
173 cmd.spawn().map_err(Into::into)
174}
175
176fn spawn_plumber_worker(config: &WorkerConfig) -> FaucetResult<Child> {
177 let command = format!(
178 r#"
179 options("plumber.port" = {port})
180 plumber::pr_run(plumber::plumb())
181 "#,
182 port = config.addr.port()
183 );
184 let child = spawn_child_rscript_process(config, command)?;
185
186 log_stdio(child, config.target)
187}
188
189fn spawn_shiny_worker(config: &WorkerConfig) -> FaucetResult<Child> {
190 let command = format!(
191 r###"
192 options("shiny.port" = {port})
193 options(shiny.http.response.filter = function(...) {{
194 response <- list(...)[[length(list(...))]]
195 if (response$status < 200 || response$status > 300) return(response)
196 if ('file' %in% names(response$content)) return(response)
197 if (!grepl("^text/html", response$content_type, perl = T)) return(response)
198 if (is.raw(response$content)) response$content <- rawToChar(response$content)
199 response$content <- sub("</head>", '<script src="__faucet__/reconnect.js"></script></head>', response$content, ignore.case = T)
200 return(response)
201 }})
202 shiny::runApp("{app_dir}")
203 "###,
204 port = config.addr.port(),
205 app_dir = config.app_dir.unwrap_or(".")
206 );
207 let child = spawn_child_rscript_process(config, command)?;
208
209 log_stdio(child, config.target)
210}
211
212fn spawn_quarto_shiny_worker(config: &WorkerConfig) -> FaucetResult<Child> {
213 let mut cmd = tokio::process::Command::new(config.quarto);
214 cmd.current_dir(config.workdir)
216 .arg("serve")
217 .args(["--port", config.addr.port().to_string().as_str()])
218 .arg(config.qmd.ok_or(FaucetError::MissingArgument("qmd"))?)
219 .stdin(std::process::Stdio::null())
220 .stdout(std::process::Stdio::piped())
221 .stderr(std::process::Stdio::piped())
222 .env("FAUCET_WORKER_ID", config.worker_id.to_string())
223 .kill_on_drop(true);
225
226 #[cfg(unix)]
227 unsafe {
228 cmd.pre_exec(|| {
229 nix::libc::setpgid(0, 0);
231 Ok(())
232 });
233 }
234
235 let child = cmd.spawn()?;
236
237 log_stdio(child, config.target)
238}
239
240impl WorkerConfig {
241 fn spawn_process(&self) -> FaucetResult<Child> {
242 let child_result = match self.wtype {
243 WorkerType::Plumber => spawn_plumber_worker(self),
244 WorkerType::Shiny => spawn_shiny_worker(self),
245 WorkerType::QuartoShiny => spawn_quarto_shiny_worker(self),
246 #[cfg(test)]
247 WorkerType::Dummy => unreachable!(
248 "WorkerType::Dummy should be handled in spawn_worker_task and not reach spawn_process"
249 ),
250 };
251
252 match child_result {
253 Ok(child) => Ok(child),
254 Err(e) => {
255 log::error!(target: "faucet", "Failed to invoke R for {target}: {e}", target = self.target);
256 Err(e)
257 }
258 }
259 }
260 pub async fn wait_until_done(&self) {
261 if let Some(handle) = self.handle.lock().await.take() {
262 log::debug!("Waiting for process to be finished");
263 match handle.await {
264 Ok(Ok(_)) => {
265 log::debug!("Task ended successfully!")
266 }
267 Ok(Err(e)) => {
268 panic!("Worker task for target '{}' failed: {:?}", self.target, e);
269 }
270 Err(e) => {
271 panic!(
272 "Worker task for target '{}' panicked or was cancelled: {:?}",
273 self.target, e
274 );
275 }
276 }
277 }
278 }
279 pub async fn spawn_worker_task(&'static self) {
280 let mut handle = self.handle.lock().await;
281
282 if let Some(handle) = handle.as_ref() {
283 if !handle.is_finished() {
284 log::warn!(target: "faucet", "Worker task for {target} is already running, skipping spawn", target = self.target);
285 return;
286 }
287 }
288
289 *handle = Some(tokio::spawn(async move {
290 #[cfg(test)]
291 if self.wtype == WorkerType::Dummy {
292 log::debug!(
293 target: "faucet",
294 "Worker {target} is type Dummy, skipping real process spawn.",
295 target = self.target
296 );
297 return FaucetResult::Ok(());
298 }
299
300 'outer: loop {
301 let mut child = match self.spawn_process() {
302 Ok(c) => c,
303 Err(e) => {
304 log::error!(
305 target: "faucet",
306 "Worker task for {target} failed to spawn initial process: {e}",
307 target = self.target
308 );
309 return Err(e);
310 }
311 };
312
313 let pid = match child.id() {
314 Some(id) => id,
315 None => {
316 let err_msg = format!(
317 "Spawned process for {target} has no PID",
318 target = self.target
319 );
320 log::error!(target: "faucet", "{err_msg}");
321 return Err(FaucetError::Unknown(err_msg));
322 }
323 };
324
325 let child_loop = async {
329 log::info!(target: "faucet", "Starting process {pid} for {target} on port {port}", port = self.addr.port(), target = self.target);
330 loop {
331 let check_status = check_if_online(self.addr).await;
333 if check_status {
335 log::info!(target: "faucet", "{target} is online and ready to serve connections at {route}", target = self.target, route = self.worker_route.unwrap_or("/"));
336 self.is_online.store(check_status, Ordering::SeqCst);
337 break;
338 }
339 if child.try_wait()?.is_some() {
342 break;
343 }
344
345 tokio::time::sleep(RECHECK_INTERVAL).await;
346 }
347 FaucetResult::Ok(child.wait().await?)
348 };
349 tokio::select! {
350 _ = self.shutdown.wait() => {
353 let _ = child.kill().await;
354 log::info!(target: "faucet", "{target}'s process ({pid}) killed for shutdown", target = self.target);
355 break 'outer;
356 },
357 _ = self.idle_stop.notified() => {
358 self.is_online.store(false, std::sync::atomic::Ordering::SeqCst);
359 let _ = child.kill().await;
360 log::info!(target: "faucet", "{target}'s process ({pid}) killed for idle stop", target = self.target);
361 break 'outer;
362 },
363 status = child_loop => {
365 self
366 .is_online
367 .store(false, std::sync::atomic::Ordering::SeqCst);
368 log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid, status?, target = self.target);
369 continue 'outer;
370 }
371 }
372 }
373 log::debug!("{target}'s process has ended.", target = self.target);
374 FaucetResult::Ok(())
375 }));
376 }
377}
378
379async fn check_if_online(addr: SocketAddr) -> bool {
380 let stream = tokio::net::TcpStream::connect(addr).await;
381 stream.is_ok()
382}
383
384const RECHECK_INTERVAL: Duration = Duration::from_millis(250);
385
386pub struct WorkerConfigs {
387 pub workers: Box<[&'static WorkerConfig]>,
388}
389
390const TRIES: usize = 20;
391
392impl WorkerConfigs {
393 pub(crate) async fn new(
394 server_config: FaucetServerConfig,
395 shutdown: &'static ShutdownSignal,
396 ) -> FaucetResult<Self> {
397 let mut workers =
398 Vec::<&'static WorkerConfig>::with_capacity(server_config.n_workers.get());
399
400 for id in 0..server_config.n_workers.get() {
401 let socket_addr = 'find_socket: loop {
404 let addr_candidate = get_available_socket(TRIES).await?;
405 if workers.iter().any(|w| w.addr == addr_candidate) {
407 continue 'find_socket;
408 }
409 break 'find_socket addr_candidate;
410 };
411
412 let config = leak!(WorkerConfig::new(
413 id + 1,
414 socket_addr,
415 &server_config,
416 shutdown
417 )) as &'static WorkerConfig;
418 workers.push(config);
419 }
420
421 let workers = workers.into_boxed_slice();
422
423 Ok(Self { workers })
424 }
425}