faucet_server/client/
worker.rs

1use crate::{
2    error::{FaucetError, FaucetResult},
3    leak,
4    networking::get_available_socket,
5    server::{
6        logging::{parse_faucet_event, FaucetEventResult},
7        FaucetServerConfig,
8    },
9    shutdown::ShutdownSignal,
10    telemetry::send_log_event,
11};
12use std::{
13    ffi::OsStr,
14    net::SocketAddr,
15    path::Path,
16    sync::atomic::{AtomicBool, Ordering},
17    time::Duration,
18};
19use tokio::{
20    process::Child,
21    sync::{Mutex, Notify},
22    task::JoinHandle,
23};
24use tokio_stream::StreamExt;
25use tokio_util::codec::{FramedRead, LinesCodec};
26
27#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, serde::Deserialize)]
28pub enum WorkerType {
29    #[serde(alias = "plumber", alias = "Plumber")]
30    Plumber,
31    #[serde(alias = "shiny", alias = "Shiny")]
32    Shiny,
33    #[serde(alias = "quarto-shiny", alias = "QuartoShiny", alias = "quarto_shiny")]
34    QuartoShiny,
35    #[cfg(test)]
36    Dummy,
37}
38
39fn log_stdio(mut child: Child, target: &'static str) -> FaucetResult<Child> {
40    let pid = child.id().expect("Failed to get plumber worker PID");
41
42    let mut stdout = FramedRead::new(
43        child.stdout.take().ok_or(FaucetError::Unknown(format!(
44            "Unable to take stdout from PID {pid}"
45        )))?,
46        LinesCodec::new(),
47    );
48
49    let mut stderr = FramedRead::new(
50        child.stderr.take().ok_or(FaucetError::Unknown(format!(
51            "Unable to take stderr from PID {pid}"
52        )))?,
53        LinesCodec::new(),
54    );
55
56    tokio::spawn(async move {
57        while let Some(line) = stderr.next().await {
58            if let Ok(line) = line {
59                match parse_faucet_event(&line) {
60                    FaucetEventResult::Output(line) => log::warn!(target: target, "{line}"),
61                    FaucetEventResult::Event(e) => {
62                        send_log_event(e);
63                    }
64                    FaucetEventResult::EventError(e) => {
65                        log::error!(target: target, "{e:?}")
66                    }
67                }
68            }
69        }
70    });
71
72    tokio::spawn(async move {
73        while let Some(line) = stdout.next().await {
74            if let Ok(line) = line {
75                log::info!(target: target, "{line}");
76            }
77        }
78    });
79
80    Ok(child)
81}
82
83#[derive(Copy, Clone)]
84pub struct WorkerConfig {
85    pub wtype: WorkerType,
86    pub app_dir: Option<&'static str>,
87    pub rscript: &'static OsStr,
88    pub quarto: &'static OsStr,
89    pub workdir: &'static Path,
90    pub addr: SocketAddr,
91    pub target: &'static str,
92    pub worker_id: usize,
93    pub worker_route: Option<&'static str>,
94    pub is_online: &'static AtomicBool,
95    pub qmd: Option<&'static Path>,
96    pub handle: &'static Mutex<Option<JoinHandle<FaucetResult<()>>>>,
97    pub shutdown: &'static ShutdownSignal,
98    pub idle_stop: &'static Notify,
99}
100
101impl WorkerConfig {
102    fn new(
103        worker_id: usize,
104        addr: SocketAddr,
105        server_config: &FaucetServerConfig,
106        shutdown: &'static ShutdownSignal,
107    ) -> Self {
108        Self {
109            addr,
110            worker_id,
111            is_online: leak!(AtomicBool::new(false)),
112            workdir: server_config.workdir,
113            worker_route: server_config.route,
114            target: leak!(format!("Worker::{}", worker_id)),
115            app_dir: server_config.app_dir,
116            wtype: server_config.server_type,
117            rscript: server_config.rscript,
118            quarto: server_config.quarto,
119            qmd: server_config.qmd,
120            handle: leak!(Mutex::new(None)),
121            shutdown,
122            idle_stop: leak!(Notify::new()),
123        }
124    }
125    #[allow(dead_code)]
126    #[cfg(test)]
127    pub fn dummy(target: &'static str, addr: &str, online: bool) -> WorkerConfig {
128        WorkerConfig {
129            target,
130            is_online: leak!(AtomicBool::new(online)),
131            addr: addr.parse().unwrap(),
132            app_dir: None,
133            worker_route: None,
134            rscript: OsStr::new(""),
135            wtype: WorkerType::Dummy,
136            worker_id: 1,
137            quarto: OsStr::new(""),
138            workdir: Path::new("."),
139            qmd: None,
140            handle: leak!(Mutex::new(None)),
141            shutdown: leak!(ShutdownSignal::new()),
142            idle_stop: leak!(Notify::new()),
143        }
144    }
145}
146
147fn spawn_child_rscript_process(
148    config: &WorkerConfig,
149    command: impl AsRef<str>,
150) -> FaucetResult<Child> {
151    let mut cmd = tokio::process::Command::new(config.rscript);
152
153    // Set the current directory to the directory containing the entrypoint
154    cmd.current_dir(config.workdir)
155        .arg("-e")
156        .arg(command.as_ref())
157        .stdin(std::process::Stdio::null())
158        .stdout(std::process::Stdio::piped())
159        .stderr(std::process::Stdio::piped())
160        .env("FAUCET_WORKER_ID", config.worker_id.to_string())
161        // This is needed to make sure the child process is killed when the parent is dropped
162        .kill_on_drop(true);
163
164    #[cfg(unix)]
165    unsafe {
166        cmd.pre_exec(|| {
167            // Create a new process group for the child process
168            nix::libc::setpgid(0, 0);
169            Ok(())
170        });
171    }
172
173    cmd.spawn().map_err(Into::into)
174}
175
176fn spawn_plumber_worker(config: &WorkerConfig) -> FaucetResult<Child> {
177    let command = format!(
178        r#"
179        options("plumber.port" = {port})
180        plumber::pr_run(plumber::plumb())
181        "#,
182        port = config.addr.port()
183    );
184    let child = spawn_child_rscript_process(config, command)?;
185
186    log_stdio(child, config.target)
187}
188
189fn spawn_shiny_worker(config: &WorkerConfig) -> FaucetResult<Child> {
190    let command = format!(
191        r###"
192        options("shiny.port" = {port})
193        options(shiny.http.response.filter = function(...) {{
194          response <- list(...)[[length(list(...))]]
195          if (response$status < 200 || response$status > 300) return(response)
196          if ('file' %in% names(response$content)) return(response)
197          if (!grepl("^text/html", response$content_type, perl = T)) return(response)
198          if (is.raw(response$content)) response$content <- rawToChar(response$content)
199          response$content <- sub("</head>", '<script src="__faucet__/reconnect.js"></script></head>', response$content, ignore.case = T)
200          return(response)
201        }})
202        shiny::runApp("{app_dir}")
203        "###,
204        port = config.addr.port(),
205        app_dir = config.app_dir.unwrap_or(".")
206    );
207    let child = spawn_child_rscript_process(config, command)?;
208
209    log_stdio(child, config.target)
210}
211
212fn spawn_quarto_shiny_worker(config: &WorkerConfig) -> FaucetResult<Child> {
213    let mut cmd = tokio::process::Command::new(config.quarto);
214    // Set the current directory to the directory containing the entrypoint
215    cmd.current_dir(config.workdir)
216        .arg("serve")
217        .args(["--port", config.addr.port().to_string().as_str()])
218        .arg(config.qmd.ok_or(FaucetError::MissingArgument("qmd"))?)
219        .stdin(std::process::Stdio::null())
220        .stdout(std::process::Stdio::piped())
221        .stderr(std::process::Stdio::piped())
222        .env("FAUCET_WORKER_ID", config.worker_id.to_string())
223        // This is needed to make sure the child process is killed when the parent is dropped
224        .kill_on_drop(true);
225
226    #[cfg(unix)]
227    unsafe {
228        cmd.pre_exec(|| {
229            // Create a new process group for the child process
230            nix::libc::setpgid(0, 0);
231            Ok(())
232        });
233    }
234
235    let child = cmd.spawn()?;
236
237    log_stdio(child, config.target)
238}
239
240impl WorkerConfig {
241    fn spawn_process(&self) -> FaucetResult<Child> {
242        let child_result = match self.wtype {
243            WorkerType::Plumber => spawn_plumber_worker(self),
244            WorkerType::Shiny => spawn_shiny_worker(self),
245            WorkerType::QuartoShiny => spawn_quarto_shiny_worker(self),
246            #[cfg(test)]
247            WorkerType::Dummy => unreachable!(
248                "WorkerType::Dummy should be handled in spawn_worker_task and not reach spawn_process"
249            ),
250        };
251
252        match child_result {
253            Ok(child) => Ok(child),
254            Err(e) => {
255                log::error!(target: "faucet", "Failed to invoke R for {target}: {e}", target = self.target);
256                Err(e)
257            }
258        }
259    }
260    pub async fn wait_until_done(&self) {
261        if let Some(handle) = self.handle.lock().await.take() {
262            log::debug!("Waiting for process to be finished");
263            match handle.await {
264                Ok(Ok(_)) => {
265                    log::debug!("Task ended successfully!")
266                }
267                Ok(Err(e)) => {
268                    panic!("Worker task for target '{}' failed: {:?}", self.target, e);
269                }
270                Err(e) => {
271                    panic!(
272                        "Worker task for target '{}' panicked or was cancelled: {:?}",
273                        self.target, e
274                    );
275                }
276            }
277        }
278    }
279    pub async fn spawn_worker_task(&'static self) {
280        let mut handle = self.handle.lock().await;
281
282        if let Some(handle) = handle.as_ref() {
283            if !handle.is_finished() {
284                log::warn!(target: "faucet", "Worker task for {target} is already running, skipping spawn", target = self.target);
285                return;
286            }
287        }
288
289        *handle = Some(tokio::spawn(async move {
290            #[cfg(test)]
291            if self.wtype == WorkerType::Dummy {
292                log::debug!(
293                    target: "faucet",
294                    "Worker {target} is type Dummy, skipping real process spawn.",
295                    target = self.target
296                );
297                return FaucetResult::Ok(());
298            }
299
300            'outer: loop {
301                let mut child = match self.spawn_process() {
302                    Ok(c) => c,
303                    Err(e) => {
304                        log::error!(
305                            target: "faucet",
306                            "Worker task for {target} failed to spawn initial process: {e}",
307                            target = self.target
308                        );
309                        return Err(e);
310                    }
311                };
312
313                let pid = match child.id() {
314                    Some(id) => id,
315                    None => {
316                        let err_msg = format!(
317                            "Spawned process for {target} has no PID",
318                            target = self.target
319                        );
320                        log::error!(target: "faucet", "{err_msg}");
321                        return Err(FaucetError::Unknown(err_msg));
322                    }
323                };
324
325                // We will run this loop asynchrnously on this same thread.
326                // We will use this to wait for either the stop signal
327                // or the child exiting
328                let child_loop = async {
329                    log::info!(target: "faucet", "Starting process {pid} for {target} on port {port}", port = self.addr.port(), target = self.target);
330                    loop {
331                        // Try to connect to the socket
332                        let check_status = check_if_online(self.addr).await;
333                        // If it's online, we can break out of the loop and start serving connections
334                        if check_status {
335                            log::info!(target: "faucet", "{target} is online and ready to serve connections at {route}", target = self.target, route = self.worker_route.unwrap_or("/"));
336                            self.is_online.store(check_status, Ordering::SeqCst);
337                            break;
338                        }
339                        // If it's not online but the child process has exited, we should break out of the loop
340                        // and restart the process
341                        if child.try_wait()?.is_some() {
342                            break;
343                        }
344
345                        tokio::time::sleep(RECHECK_INTERVAL).await;
346                    }
347                    FaucetResult::Ok(child.wait().await?)
348                };
349                tokio::select! {
350                    // If we receive a stop signal that means we will stop the outer loop
351                    // and kill the process
352                    _ = self.shutdown.wait() => {
353                        let _ = child.kill().await;
354                        log::info!(target: "faucet", "{target}'s process ({pid}) killed for shutdown", target = self.target);
355                        break 'outer;
356                    },
357                    _ = self.idle_stop.notified() => {
358                        self.is_online.store(false, std::sync::atomic::Ordering::SeqCst);
359                        let _ = child.kill().await;
360                        log::info!(target: "faucet", "{target}'s process ({pid}) killed for idle stop", target = self.target);
361                        break 'outer;
362                    },
363                    // If our child loop stops that means the process crashed. We will restart it
364                    status = child_loop => {
365                       self
366                            .is_online
367                            .store(false, std::sync::atomic::Ordering::SeqCst);
368                        log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid, status?, target = self.target);
369                        continue 'outer;
370                    }
371                }
372            }
373            log::debug!("{target}'s process has ended.", target = self.target);
374            FaucetResult::Ok(())
375        }));
376    }
377}
378
379async fn check_if_online(addr: SocketAddr) -> bool {
380    let stream = tokio::net::TcpStream::connect(addr).await;
381    stream.is_ok()
382}
383
384const RECHECK_INTERVAL: Duration = Duration::from_millis(250);
385
386pub struct WorkerConfigs {
387    pub workers: Box<[&'static WorkerConfig]>,
388}
389
390const TRIES: usize = 20;
391
392impl WorkerConfigs {
393    pub(crate) async fn new(
394        server_config: FaucetServerConfig,
395        shutdown: &'static ShutdownSignal,
396    ) -> FaucetResult<Self> {
397        let mut workers =
398            Vec::<&'static WorkerConfig>::with_capacity(server_config.n_workers.get());
399
400        for id in 0..server_config.n_workers.get() {
401            // Probably hacky but it works. I need to guarantee that ports are never
402            // reused
403            let socket_addr = 'find_socket: loop {
404                let addr_candidate = get_available_socket(TRIES).await?;
405                // Check if another worker has already reserved this port
406                if workers.iter().any(|w| w.addr == addr_candidate) {
407                    continue 'find_socket;
408                }
409                break 'find_socket addr_candidate;
410            };
411
412            let config = leak!(WorkerConfig::new(
413                id + 1,
414                socket_addr,
415                &server_config,
416                shutdown
417            )) as &'static WorkerConfig;
418            workers.push(config);
419        }
420
421        let workers = workers.into_boxed_slice();
422
423        Ok(Self { workers })
424    }
425}