use crate::config::MapServiceCfg;
use crate::dispatcher::{DispatchConfig, Dispatcher};
use crate::wms_fcgi_backend::FcgiBackendType;
use async_process::{Child as ChildProcess, Command, Stdio};
use async_trait::async_trait;
use bbox_core::config::Loglevel;
use bufstream::BufStream;
use fastcgi_client::Client;
use log::{debug, error, info, warn};
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
use std::time::Duration;
use tempfile::TempDir;
struct FcgiProcess {
child: ChildProcess,
socket_path: String,
}
impl FcgiProcess {
pub async fn spawn(
fcgi_bin: &str,
base_dir: Option<&PathBuf>,
envs: &[(&str, &str)],
socket_path: &str,
) -> std::io::Result<Self> {
let child = FcgiProcess::spawn_process(fcgi_bin, base_dir, envs, socket_path)?;
Ok(FcgiProcess {
child,
socket_path: socket_path.to_string(),
})
}
pub async fn respawn(
&mut self,
fcgi_bin: &str,
base_dir: Option<&PathBuf>,
envs: &[(&str, &str)],
) -> std::io::Result<()> {
self.child = FcgiProcess::spawn_process(fcgi_bin, base_dir, envs, &self.socket_path)?;
Ok(())
}
fn spawn_process(
fcgi_bin: &str,
base_dir: Option<&PathBuf>,
envs: &[(&str, &str)],
socket_path: &str,
) -> std::io::Result<ChildProcess> {
debug!("Spawning {fcgi_bin} on {socket_path}");
let socket = Path::new(socket_path);
if socket.exists() {
std::fs::remove_file(socket)?;
}
let listener = UnixListener::bind(socket)?;
let fd = listener.into_raw_fd();
let fcgi_io = unsafe { Stdio::from_raw_fd(fd) };
let mut cmd = Command::new(fcgi_bin);
cmd.stdin(fcgi_io);
cmd.kill_on_drop(true);
if let Some(dir) = base_dir {
cmd.current_dir(dir);
}
cmd.envs(envs.to_vec());
let child = cmd.spawn()?;
Ok(child)
}
pub fn is_running(&mut self) -> std::io::Result<bool> {
Ok(self.child.try_status()?.is_none())
}
}
impl Drop for FcgiProcess {
fn drop(&mut self) {
let socket = Path::new(&self.socket_path);
if socket.exists() {
debug!("Removing socket {}", &self.socket_path);
let _ = std::fs::remove_file(socket);
}
}
}
pub struct FcgiProcessPool {
fcgi_bin: String,
base_dir: Option<PathBuf>,
envs: Vec<(String, String)>,
backend_name: String,
pub(crate) suffixes: Vec<FcgiSuffixUrl>,
num_processes: usize,
socket_dir: TempDir,
processes: Vec<FcgiProcess>,
}
#[derive(Clone)]
pub struct FcgiSuffixUrl {
pub suffix: String,
pub url_base: String,
}
impl FcgiProcessPool {
pub fn new(
fcgi_bin: String,
base_dir: Option<PathBuf>,
backend: &dyn FcgiBackendType,
loglevel: &Option<Loglevel>,
num_processes: usize,
) -> Self {
let socket_dir = TempDir::with_prefix("bbox-").expect("TempDir creation");
FcgiProcessPool {
fcgi_bin,
base_dir,
envs: backend.envs(loglevel),
backend_name: backend.name().to_string(),
suffixes: backend
.project_files()
.iter()
.flat_map(|s| {
backend.url_base(s).map(|b| FcgiSuffixUrl {
suffix: s.to_string(),
url_base: b.to_string(),
})
})
.collect(),
socket_dir,
num_processes,
processes: Vec::new(),
}
}
fn socket_path(&self, process_no: usize) -> String {
self.socket_dir
.path()
.join(format!("fcgi_{}_{process_no}.sock", self.backend_name))
.to_string_lossy()
.to_string()
}
pub async fn spawn_processes(&mut self) -> std::io::Result<()> {
let envs: Vec<_> = self
.envs
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
for no in 0..self.num_processes {
let socket_path = self.socket_path(no);
let process =
FcgiProcess::spawn(&self.fcgi_bin, self.base_dir.as_ref(), &envs, &socket_path)
.await?;
self.processes.push(process)
}
info!(
"Spawned {} FCGI processes '{}'",
self.processes.len(),
&self.fcgi_bin
);
Ok(())
}
pub fn client_dispatcher(&self, wms_config: &MapServiceCfg) -> FcgiDispatcher {
debug!("Creating {} FcgiDispatcher", self.backend_name);
let config = DispatchConfig::new();
let pools = (0..self.num_processes)
.map(|no| {
let socket_path = self.socket_path(no);
let handler = FcgiClientHandler { socket_path };
FcgiClientPool::builder(handler)
.max_size(wms_config.fcgi_client_pool_size)
.runtime(deadpool::Runtime::Tokio1)
.wait_timeout(wms_config.wait_timeout.map(Duration::from_millis))
.create_timeout(wms_config.create_timeout.map(Duration::from_millis))
.recycle_timeout(wms_config.recycle_timeout.map(Duration::from_millis))
.build()
.expect("FcgiClientPool::builder")
})
.collect();
let dispatcher = Dispatcher::new(&config, &pools);
FcgiDispatcher {
backend_name: self.backend_name.clone(),
pools,
dispatcher,
suffixes: self.suffixes.clone(),
}
}
async fn check_process(&mut self, no: usize) -> std::io::Result<()> {
if let Some(p) = self.processes.get_mut(no) {
match p.is_running() {
Ok(true) => {} Ok(false) => {
warn!("process[{no}] not running - restarting...");
let envs: Vec<_> = self
.envs
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
if let Err(e) = p
.respawn(&self.fcgi_bin, self.base_dir.as_ref(), &envs)
.await
{
warn!("process[{no}] restarting error: {e}");
}
}
Err(e) => debug!("process[{no}].is_running(): {e}"),
}
} else {
error!("process[{no}] does not exist");
}
Ok(())
}
pub async fn watchdog_loop(&mut self) {
loop {
for no in 0..self.processes.len() {
let _ = self.check_process(no).await;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
#[derive(Clone)]
pub struct FcgiClientHandler {
socket_path: String,
}
impl FcgiClientHandler {
fn fcgi_client(&self) -> std::io::Result<FcgiClient> {
let stream = UnixStream::connect(&self.socket_path)?;
let fcgi_client = Client::new(stream, true);
Ok(fcgi_client)
}
}
pub type FcgiClient = fastcgi_client::Client<BufStream<UnixStream>>;
pub type FcgiClientPoolError = std::io::Error;
#[async_trait]
impl deadpool::managed::Manager for FcgiClientHandler {
type Type = FcgiClient;
type Error = FcgiClientPoolError;
async fn create(&self) -> Result<FcgiClient, FcgiClientPoolError> {
debug!("deadpool::managed::Manager::create {}", &self.socket_path);
let client = self.fcgi_client();
if let Err(ref e) = client {
debug!("Failed to create client {}: {e}", &self.socket_path);
}
client
}
async fn recycle(
&self,
_fcgi: &mut FcgiClient,
) -> deadpool::managed::RecycleResult<FcgiClientPoolError> {
debug!("deadpool::managed::Manager::recycle {}", &self.socket_path);
Ok(())
}
}
pub type FcgiClientPool = deadpool::managed::Pool<FcgiClientHandler>;
pub struct FcgiDispatcher {
backend_name: String,
pools: Vec<FcgiClientPool>,
dispatcher: Dispatcher,
pub(crate) suffixes: Vec<FcgiSuffixUrl>,
}
impl FcgiDispatcher {
pub fn backend_name(&self) -> &str {
&self.backend_name
}
pub fn select(&self, query_str: &str) -> (usize, &FcgiClientPool) {
let poolno = self.dispatcher.select(query_str);
let pool = &self.pools[poolno];
debug!("selected pool {poolno}: client {:?}", pool.status());
(poolno, pool)
}
pub fn remove(&self, fcgi_client: deadpool::managed::Object<FcgiClientHandler>) {
debug!("Removing Client from FcgiClientPool");
let _obj = deadpool::managed::Object::take(fcgi_client);
}
}