1use crate::config::MapServiceCfg;
18use crate::dispatcher::{DispatchConfig, Dispatcher};
19use crate::wms_fcgi_backend::FcgiBackendType;
20use async_process::{Child as ChildProcess, Command, Stdio};
21use async_trait::async_trait;
22use bbox_core::config::Loglevel;
23use bufstream::BufStream;
24use fastcgi_client::Client;
25use log::{debug, error, info, warn};
26use std::os::unix::io::{FromRawFd, IntoRawFd};
27use std::os::unix::net::{UnixListener, UnixStream};
28use std::path::{Path, PathBuf};
29use std::time::Duration;
30use tempfile::TempDir;
31
32struct FcgiProcess {
36 child: ChildProcess,
37 socket_path: String,
38}
39
40impl FcgiProcess {
41 pub async fn spawn(
42 fcgi_bin: &str,
43 base_dir: Option<&PathBuf>,
44 envs: &[(&str, &str)],
45 socket_path: &str,
46 ) -> std::io::Result<Self> {
47 let child = FcgiProcess::spawn_process(fcgi_bin, base_dir, envs, socket_path)?;
48 Ok(FcgiProcess {
49 child,
50 socket_path: socket_path.to_string(),
51 })
52 }
53
54 pub async fn respawn(
55 &mut self,
56 fcgi_bin: &str,
57 base_dir: Option<&PathBuf>,
58 envs: &[(&str, &str)],
59 ) -> std::io::Result<()> {
60 self.child = FcgiProcess::spawn_process(fcgi_bin, base_dir, envs, &self.socket_path)?;
61 Ok(())
62 }
63
64 fn spawn_process(
65 fcgi_bin: &str,
66 base_dir: Option<&PathBuf>,
67 envs: &[(&str, &str)],
68 socket_path: &str,
69 ) -> std::io::Result<ChildProcess> {
70 debug!("Spawning {fcgi_bin} on {socket_path}");
71 let socket = Path::new(socket_path);
72 if socket.exists() {
73 std::fs::remove_file(socket)?;
74 }
75 let listener = UnixListener::bind(socket)?;
76 let fd = listener.into_raw_fd();
77 let fcgi_io = unsafe { Stdio::from_raw_fd(fd) };
78
79 let mut cmd = Command::new(fcgi_bin);
80 cmd.stdin(fcgi_io);
81 cmd.kill_on_drop(true);
82 if let Some(dir) = base_dir {
83 cmd.current_dir(dir);
84 }
85 cmd.envs(envs.to_vec());
86 let child = cmd.spawn()?;
87
88 Ok(child)
89 }
90
91 pub fn is_running(&mut self) -> std::io::Result<bool> {
92 Ok(self.child.try_status()?.is_none())
93 }
94}
95
96impl Drop for FcgiProcess {
97 fn drop(&mut self) {
98 let socket = Path::new(&self.socket_path);
99 if socket.exists() {
100 debug!("Removing socket {}", &self.socket_path);
101 let _ = std::fs::remove_file(socket);
102 }
103 }
104}
105
106pub struct FcgiProcessPool {
110 fcgi_bin: String,
111 base_dir: Option<PathBuf>,
112 envs: Vec<(String, String)>,
113 backend_name: String,
114 pub(crate) suffixes: Vec<FcgiSuffixUrl>,
115 num_processes: usize,
116 socket_dir: TempDir,
117 processes: Vec<FcgiProcess>,
118}
119
120#[derive(Clone)]
121pub struct FcgiSuffixUrl {
122 pub suffix: String,
123 pub url_base: String,
124}
125
126impl FcgiProcessPool {
127 pub fn new(
128 fcgi_bin: String,
129 base_dir: Option<PathBuf>,
130 backend: &dyn FcgiBackendType,
131 loglevel: &Option<Loglevel>,
132 num_processes: usize,
133 ) -> Self {
134 let socket_dir = TempDir::with_prefix("bbox-").expect("TempDir creation");
136 FcgiProcessPool {
137 fcgi_bin,
138 base_dir,
139 envs: backend.envs(loglevel),
140 backend_name: backend.name().to_string(),
141 suffixes: backend
142 .project_files()
143 .iter()
144 .flat_map(|s| {
145 backend.url_base(s).map(|b| FcgiSuffixUrl {
146 suffix: s.to_string(),
147 url_base: b.to_string(),
148 })
149 })
150 .collect(),
151 socket_dir,
152 num_processes,
153 processes: Vec::new(),
154 }
155 }
156 fn socket_path(&self, process_no: usize) -> String {
158 self.socket_dir
159 .path()
160 .join(format!("fcgi_{}_{process_no}.sock", self.backend_name))
161 .to_string_lossy()
162 .to_string()
163 }
164 pub async fn spawn_processes(&mut self) -> std::io::Result<()> {
165 let envs: Vec<_> = self
166 .envs
167 .iter()
168 .map(|(k, v)| (k.as_str(), v.as_str()))
169 .collect();
170 for no in 0..self.num_processes {
171 let socket_path = self.socket_path(no);
172 let process =
173 FcgiProcess::spawn(&self.fcgi_bin, self.base_dir.as_ref(), &envs, &socket_path)
174 .await?;
175 self.processes.push(process)
176 }
177 info!(
178 "Spawned {} FCGI processes '{}'",
179 self.processes.len(),
180 &self.fcgi_bin
181 );
182 Ok(())
183 }
184
185 pub fn client_dispatcher(&self, wms_config: &MapServiceCfg) -> FcgiDispatcher {
187 debug!("Creating {} FcgiDispatcher", self.backend_name);
188 let config = DispatchConfig::new();
189 let pools = (0..self.num_processes)
190 .map(|no| {
191 let socket_path = self.socket_path(no);
192 let handler = FcgiClientHandler { socket_path };
193 FcgiClientPool::builder(handler)
194 .max_size(wms_config.fcgi_client_pool_size)
195 .runtime(deadpool::Runtime::Tokio1)
196 .wait_timeout(wms_config.wait_timeout.map(Duration::from_millis))
197 .create_timeout(wms_config.create_timeout.map(Duration::from_millis))
198 .recycle_timeout(wms_config.recycle_timeout.map(Duration::from_millis))
199 .build()
200 .expect("FcgiClientPool::builder")
201 })
202 .collect();
203 let dispatcher = Dispatcher::new(&config, &pools);
204 FcgiDispatcher {
205 backend_name: self.backend_name.clone(),
206 pools,
207 dispatcher,
208 suffixes: self.suffixes.clone(),
209 }
210 }
211
212 async fn check_process(&mut self, no: usize) -> std::io::Result<()> {
213 if let Some(p) = self.processes.get_mut(no) {
214 match p.is_running() {
215 Ok(true) => {} Ok(false) => {
217 warn!("process[{no}] not running - restarting...");
218 let envs: Vec<_> = self
219 .envs
220 .iter()
221 .map(|(k, v)| (k.as_str(), v.as_str()))
222 .collect();
223 if let Err(e) = p
224 .respawn(&self.fcgi_bin, self.base_dir.as_ref(), &envs)
225 .await
226 {
227 warn!("process[{no}] restarting error: {e}");
228 }
229 }
230 Err(e) => debug!("process[{no}].is_running(): {e}"),
231 }
232 } else {
233 error!("process[{no}] does not exist");
234 }
235 Ok(())
236 }
237
238 pub async fn watchdog_loop(&mut self) {
239 loop {
240 for no in 0..self.processes.len() {
242 let _ = self.check_process(no).await;
243 }
244 tokio::time::sleep(Duration::from_secs(1)).await;
245 }
246 }
247}
248
249#[derive(Clone)]
252pub struct FcgiClientHandler {
253 socket_path: String,
254}
255
256impl FcgiClientHandler {
257 fn fcgi_client(&self) -> std::io::Result<FcgiClient> {
258 let stream = UnixStream::connect(&self.socket_path)?;
259 let fcgi_client = Client::new(stream, true);
261 Ok(fcgi_client)
262 }
263}
264
265pub type FcgiClient = fastcgi_client::Client<BufStream<UnixStream>>;
266
267pub type FcgiClientPoolError = std::io::Error;
270
271#[async_trait]
272impl deadpool::managed::Manager for FcgiClientHandler {
273 type Type = FcgiClient;
274 type Error = FcgiClientPoolError;
275 async fn create(&self) -> Result<FcgiClient, FcgiClientPoolError> {
276 debug!("deadpool::managed::Manager::create {}", &self.socket_path);
277 let client = self.fcgi_client();
278 if let Err(ref e) = client {
279 debug!("Failed to create client {}: {e}", &self.socket_path);
280 }
281 client
282 }
283 async fn recycle(
284 &self,
285 _fcgi: &mut FcgiClient,
286 ) -> deadpool::managed::RecycleResult<FcgiClientPoolError> {
287 debug!("deadpool::managed::Manager::recycle {}", &self.socket_path);
288 Ok(())
289 }
293}
294
295pub type FcgiClientPool = deadpool::managed::Pool<FcgiClientHandler>;
296
297pub struct FcgiDispatcher {
301 backend_name: String,
302 pools: Vec<FcgiClientPool>,
304 dispatcher: Dispatcher,
306 pub(crate) suffixes: Vec<FcgiSuffixUrl>,
308}
309
310impl FcgiDispatcher {
311 pub fn backend_name(&self) -> &str {
312 &self.backend_name
313 }
314 pub fn select(&self, query_str: &str) -> (usize, &FcgiClientPool) {
317 let poolno = self.dispatcher.select(query_str);
318 let pool = &self.pools[poolno];
319 debug!("selected pool {poolno}: client {:?}", pool.status());
320 (poolno, pool)
321 }
322 pub fn remove(&self, fcgi_client: deadpool::managed::Object<FcgiClientHandler>) {
324 debug!("Removing Client from FcgiClientPool");
326 let _obj = deadpool::managed::Object::take(fcgi_client);
327 }
331}