bbox_map_server/
fcgi_process.rs

1//! FCGI process management
2//!
3//! ```md
4//! ┌────────────────────┐         ┌─────────────────┐
5//! │FcgiDispatcher      │         │FcgiProcessPool  │
6//! │ ┌────────────────┐ │ socket1 │ ┌─────────────┐ │
7//! │ │ FcgiClientPool ├─┼─────────┤►│ FcgiProcess │ │
8//! │ └────────────────┘ │         │ └─────────────┘ │
9//! │                    │         │                 │
10//! │ ┌────────────────┐ │ socket2 │ ┌─────────────┐ │
11//! │ │ FcgiClientPool ├─┼─────────┤►│ FcgiProcess │ │
12//! │ └────────────────┘ │         │ └─────────────┘ │
13//! │                    │         │                 │
14//! └────────────────────┘         └─────────────────┘
15//! ```
16
17use crate::config::MapServiceCfg;
18use crate::dispatcher::{DispatchConfig, Dispatcher};
19use crate::wms_fcgi_backend::FcgiBackendType;
20use async_process::{Child as ChildProcess, Command, Stdio};
21use async_trait::async_trait;
22use bbox_core::config::Loglevel;
23use bufstream::BufStream;
24use fastcgi_client::Client;
25use log::{debug, error, info, warn};
26use std::os::unix::io::{FromRawFd, IntoRawFd};
27use std::os::unix::net::{UnixListener, UnixStream};
28use std::path::{Path, PathBuf};
29use std::time::Duration;
30use tempfile::TempDir;
31
32// --- FCGI Process ---
33
34/// Child process with FCGI communication
35struct FcgiProcess {
36    child: ChildProcess,
37    socket_path: String,
38}
39
40impl FcgiProcess {
41    pub async fn spawn(
42        fcgi_bin: &str,
43        base_dir: Option<&PathBuf>,
44        envs: &[(&str, &str)],
45        socket_path: &str,
46    ) -> std::io::Result<Self> {
47        let child = FcgiProcess::spawn_process(fcgi_bin, base_dir, envs, socket_path)?;
48        Ok(FcgiProcess {
49            child,
50            socket_path: socket_path.to_string(),
51        })
52    }
53
54    pub async fn respawn(
55        &mut self,
56        fcgi_bin: &str,
57        base_dir: Option<&PathBuf>,
58        envs: &[(&str, &str)],
59    ) -> std::io::Result<()> {
60        self.child = FcgiProcess::spawn_process(fcgi_bin, base_dir, envs, &self.socket_path)?;
61        Ok(())
62    }
63
64    fn spawn_process(
65        fcgi_bin: &str,
66        base_dir: Option<&PathBuf>,
67        envs: &[(&str, &str)],
68        socket_path: &str,
69    ) -> std::io::Result<ChildProcess> {
70        debug!("Spawning {fcgi_bin} on {socket_path}");
71        let socket = Path::new(socket_path);
72        if socket.exists() {
73            std::fs::remove_file(socket)?;
74        }
75        let listener = UnixListener::bind(socket)?;
76        let fd = listener.into_raw_fd();
77        let fcgi_io = unsafe { Stdio::from_raw_fd(fd) };
78
79        let mut cmd = Command::new(fcgi_bin);
80        cmd.stdin(fcgi_io);
81        cmd.kill_on_drop(true);
82        if let Some(dir) = base_dir {
83            cmd.current_dir(dir);
84        }
85        cmd.envs(envs.to_vec());
86        let child = cmd.spawn()?;
87
88        Ok(child)
89    }
90
91    pub fn is_running(&mut self) -> std::io::Result<bool> {
92        Ok(self.child.try_status()?.is_none())
93    }
94}
95
96impl Drop for FcgiProcess {
97    fn drop(&mut self) {
98        let socket = Path::new(&self.socket_path);
99        if socket.exists() {
100            debug!("Removing socket {}", &self.socket_path);
101            let _ = std::fs::remove_file(socket);
102        }
103    }
104}
105
106// --- FCGI Process Pool ---
107
108/// Collection of processes for one FCGI application
109pub struct FcgiProcessPool {
110    fcgi_bin: String,
111    base_dir: Option<PathBuf>,
112    envs: Vec<(String, String)>,
113    backend_name: String,
114    pub(crate) suffixes: Vec<FcgiSuffixUrl>,
115    num_processes: usize,
116    socket_dir: TempDir,
117    processes: Vec<FcgiProcess>,
118}
119
120#[derive(Clone)]
121pub struct FcgiSuffixUrl {
122    pub suffix: String,
123    pub url_base: String,
124}
125
126impl FcgiProcessPool {
127    pub fn new(
128        fcgi_bin: String,
129        base_dir: Option<PathBuf>,
130        backend: &dyn FcgiBackendType,
131        loglevel: &Option<Loglevel>,
132        num_processes: usize,
133    ) -> Self {
134        // We use the system temp path, but according to FHS /run would be correct
135        let socket_dir = TempDir::with_prefix("bbox-").expect("TempDir creation");
136        FcgiProcessPool {
137            fcgi_bin,
138            base_dir,
139            envs: backend.envs(loglevel),
140            backend_name: backend.name().to_string(),
141            suffixes: backend
142                .project_files()
143                .iter()
144                .flat_map(|s| {
145                    backend.url_base(s).map(|b| FcgiSuffixUrl {
146                        suffix: s.to_string(),
147                        url_base: b.to_string(),
148                    })
149                })
150                .collect(),
151            socket_dir,
152            num_processes,
153            processes: Vec::new(),
154        }
155    }
156    /// Constant socket path over application lifetime
157    fn socket_path(&self, process_no: usize) -> String {
158        self.socket_dir
159            .path()
160            .join(format!("fcgi_{}_{process_no}.sock", self.backend_name))
161            .to_string_lossy()
162            .to_string()
163    }
164    pub async fn spawn_processes(&mut self) -> std::io::Result<()> {
165        let envs: Vec<_> = self
166            .envs
167            .iter()
168            .map(|(k, v)| (k.as_str(), v.as_str()))
169            .collect();
170        for no in 0..self.num_processes {
171            let socket_path = self.socket_path(no);
172            let process =
173                FcgiProcess::spawn(&self.fcgi_bin, self.base_dir.as_ref(), &envs, &socket_path)
174                    .await?;
175            self.processes.push(process)
176        }
177        info!(
178            "Spawned {} FCGI processes '{}'",
179            self.processes.len(),
180            &self.fcgi_bin
181        );
182        Ok(())
183    }
184
185    /// Create client pool for each process and return dispatcher
186    pub fn client_dispatcher(&self, wms_config: &MapServiceCfg) -> FcgiDispatcher {
187        debug!("Creating {} FcgiDispatcher", self.backend_name);
188        let config = DispatchConfig::new();
189        let pools = (0..self.num_processes)
190            .map(|no| {
191                let socket_path = self.socket_path(no);
192                let handler = FcgiClientHandler { socket_path };
193                FcgiClientPool::builder(handler)
194                    .max_size(wms_config.fcgi_client_pool_size)
195                    .runtime(deadpool::Runtime::Tokio1)
196                    .wait_timeout(wms_config.wait_timeout.map(Duration::from_millis))
197                    .create_timeout(wms_config.create_timeout.map(Duration::from_millis))
198                    .recycle_timeout(wms_config.recycle_timeout.map(Duration::from_millis))
199                    .build()
200                    .expect("FcgiClientPool::builder")
201            })
202            .collect();
203        let dispatcher = Dispatcher::new(&config, &pools);
204        FcgiDispatcher {
205            backend_name: self.backend_name.clone(),
206            pools,
207            dispatcher,
208            suffixes: self.suffixes.clone(),
209        }
210    }
211
212    async fn check_process(&mut self, no: usize) -> std::io::Result<()> {
213        if let Some(p) = self.processes.get_mut(no) {
214            match p.is_running() {
215                Ok(true) => {} // ok
216                Ok(false) => {
217                    warn!("process[{no}] not running - restarting...");
218                    let envs: Vec<_> = self
219                        .envs
220                        .iter()
221                        .map(|(k, v)| (k.as_str(), v.as_str()))
222                        .collect();
223                    if let Err(e) = p
224                        .respawn(&self.fcgi_bin, self.base_dir.as_ref(), &envs)
225                        .await
226                    {
227                        warn!("process[{no}] restarting error: {e}");
228                    }
229                }
230                Err(e) => debug!("process[{no}].is_running(): {e}"),
231            }
232        } else {
233            error!("process[{no}] does not exist");
234        }
235        Ok(())
236    }
237
238    pub async fn watchdog_loop(&mut self) {
239        loop {
240            // debug!("Checking process pool");
241            for no in 0..self.processes.len() {
242                let _ = self.check_process(no).await;
243            }
244            tokio::time::sleep(Duration::from_secs(1)).await;
245        }
246    }
247}
248
249// --- FCGI Client ---
250
251#[derive(Clone)]
252pub struct FcgiClientHandler {
253    socket_path: String,
254}
255
256impl FcgiClientHandler {
257    fn fcgi_client(&self) -> std::io::Result<FcgiClient> {
258        let stream = UnixStream::connect(&self.socket_path)?;
259        // let stream = TcpStream::connect(("127.0.0.1", 9000)).unwrap();
260        let fcgi_client = Client::new(stream, true);
261        Ok(fcgi_client)
262    }
263}
264
265pub type FcgiClient = fastcgi_client::Client<BufStream<UnixStream>>;
266
267// --- FCGI Client Pool ---
268
269pub type FcgiClientPoolError = std::io::Error;
270
271#[async_trait]
272impl deadpool::managed::Manager for FcgiClientHandler {
273    type Type = FcgiClient;
274    type Error = FcgiClientPoolError;
275    async fn create(&self) -> Result<FcgiClient, FcgiClientPoolError> {
276        debug!("deadpool::managed::Manager::create {}", &self.socket_path);
277        let client = self.fcgi_client();
278        if let Err(ref e) = client {
279            debug!("Failed to create client {}: {e}", &self.socket_path);
280        }
281        client
282    }
283    async fn recycle(
284        &self,
285        _fcgi: &mut FcgiClient,
286    ) -> deadpool::managed::RecycleResult<FcgiClientPoolError> {
287        debug!("deadpool::managed::Manager::recycle {}", &self.socket_path);
288        Ok(())
289        // Err(deadpool::managed::RecycleError::Message(
290        //     "client invalid".to_string(),
291        // ))
292    }
293}
294
295pub type FcgiClientPool = deadpool::managed::Pool<FcgiClientHandler>;
296
297// --- FCGI Dispatching ---
298
299/// FCGI client dispatcher
300pub struct FcgiDispatcher {
301    backend_name: String,
302    /// Client pool for each FCGI process
303    pools: Vec<FcgiClientPool>,
304    /// Mode-dependent dispatcher
305    dispatcher: Dispatcher,
306    /// Suffix info for endpoint registration
307    pub(crate) suffixes: Vec<FcgiSuffixUrl>,
308}
309
310impl FcgiDispatcher {
311    pub fn backend_name(&self) -> &str {
312        &self.backend_name
313    }
314    /// Select FCGI process
315    /// Returns process index and FCGI client pool
316    pub fn select(&self, query_str: &str) -> (usize, &FcgiClientPool) {
317        let poolno = self.dispatcher.select(query_str);
318        let pool = &self.pools[poolno];
319        debug!("selected pool {poolno}: client {:?}", pool.status());
320        (poolno, pool)
321    }
322    /// Remove possibly broken client
323    pub fn remove(&self, fcgi_client: deadpool::managed::Object<FcgiClientHandler>) {
324        // Can't call with `&mut self` from web service thread
325        debug!("Removing Client from FcgiClientPool");
326        let _obj = deadpool::managed::Object::take(fcgi_client);
327        // TODO: remove all clients with same socket path
328        // Possible implementation:
329        // Return error in FcgiClientHandler::recycle when self.socket_path is younger than FcgiClient
330    }
331}