1use crate::fcgi_process::*;
2use crate::metrics::WmsMetrics;
3use crate::service::MapService;
4use actix_web::{guard, web, HttpRequest, HttpResponse};
5use bbox_core::service::{OgcApiService, ServiceEndpoints};
6use bbox_core::TileResponse;
7use log::{debug, info, warn};
8use opentelemetry::{
9 global,
10 trace::{SpanKind, TraceContextExt, Tracer},
11 Context, KeyValue, Value,
12};
13use std::io::{BufRead, Cursor};
14use std::str::FromStr;
15use std::time::{Duration, SystemTime};
16
17#[derive(thiserror::Error, Debug)]
18pub enum FcgiError {
19 #[error("FCGI timeout")]
20 FcgiTimeout,
21 #[error("FCGI request error")]
22 FcgiRequestError,
23 #[error("I/O error")]
24 IoError(#[from] std::io::Error),
25}
26
27impl From<FcgiError> for actix_web::Error {
28 fn from(err: FcgiError) -> Self {
29 actix_web::error::ErrorInternalServerError(err)
30 }
31}
32
33async fn wms_fcgi(
38 fcgi_dispatcher: web::Data<FcgiDispatcher>,
39 suffix: web::Data<String>,
40 project: web::Path<String>,
41 metrics: web::Data<WmsMetrics>,
42 body: String,
43 req: HttpRequest,
44) -> Result<HttpResponse, actix_web::Error> {
45 let fcgi_query = format!("map={project}.{}&{}", suffix.as_str(), req.query_string());
47 let conn_info = req.connection_info().clone();
48 let request_params = HttpRequestParams {
49 scheme: conn_info.scheme(),
50 host: conn_info.host(),
51 req_path: req.path(),
52 metrics: &metrics,
53 };
54 wms_fcgi_request(
55 &fcgi_dispatcher,
56 &fcgi_query,
57 request_params,
58 req.method().as_str(),
59 body,
60 &project,
61 )
62 .await
63}
64
65pub struct HttpRequestParams<'a> {
66 pub scheme: &'a str,
67 pub host: &'a str,
68 pub req_path: &'a str,
69 pub metrics: &'a WmsMetrics,
70}
71
72pub async fn wms_fcgi_request(
73 fcgi_dispatcher: &FcgiDispatcher,
74 fcgi_query: &str,
75 request_params: HttpRequestParams<'_>,
76 req_method: &str,
77 body: String,
78 project: &str,
79) -> Result<HttpResponse, actix_web::Error> {
80 let wms_resp = wms_fcgi_req(
81 fcgi_dispatcher,
82 fcgi_query,
83 request_params,
84 req_method,
85 body,
86 project,
87 )
88 .await?;
89 let mut response = HttpResponse::Ok();
90 for (key, value) in wms_resp.headers() {
91 response.insert_header((key, value));
92 }
94 Ok(response.streaming(wms_resp.into_stream()))
95}
96
97pub async fn wms_fcgi_req(
98 fcgi_dispatcher: &FcgiDispatcher,
99 fcgi_query: &str,
100 request_params: HttpRequestParams<'_>,
101 req_method: &str,
102 body: String,
103 project: &str,
104) -> Result<TileResponse, FcgiError> {
105 let req_path = request_params.req_path;
106 let metrics = request_params.metrics;
107 let tracer = global::tracer("request");
109 let span = tracer.start("wms_fcgi_req");
110 let ctx = Context::current_with_span(span);
111 let (fcgino, pool) = fcgi_dispatcher.select(fcgi_query);
114 let available_clients = pool.status().available;
115
116 metrics
118 .wms_requests_counter
119 .with_label_values(&[
120 req_path,
121 fcgi_dispatcher.backend_name(),
122 &fcgino.to_string(),
123 ])
124 .inc();
125 ctx.span().set_attributes([
126 KeyValue::new("project", project.to_string()),
127 KeyValue::new("fcgino", Value::I64(fcgino as i64)),
128 ]);
129 let span = tracer.start_with_context("fcgi_wait", &ctx);
133 let ctx2 = Context::current_with_span(span);
134 let fcgi_client_start = SystemTime::now();
137 let fcgi_client = pool.get().await;
138 let fcgi_client_wait_elapsed = fcgi_client_start.elapsed();
139
140 ctx2.span().set_attribute(KeyValue::new(
142 "available_clients",
143 Value::I64(available_clients as i64),
144 ));
145 drop(ctx2);
146 metrics.fcgi_client_pool_available[fcgino]
147 .with_label_values(&[fcgi_dispatcher.backend_name()])
148 .set(available_clients as i64);
149 if let Ok(elapsed) = fcgi_client_wait_elapsed {
150 let duration =
151 (elapsed.as_secs() as f64) + f64::from(elapsed.subsec_nanos()) / 1_000_000_000_f64;
152 metrics.fcgi_client_wait_seconds[fcgino]
153 .with_label_values(&[fcgi_dispatcher.backend_name()])
154 .observe(duration);
155 }
156 let mut fcgi_client = match fcgi_client {
159 Ok(fcgi) => fcgi,
160 Err(_) => {
161 warn!("FCGI client timeout");
162 return Err(FcgiError::FcgiTimeout);
163 }
164 };
165
166 let span = tracer.start_with_context("wms_fcgi", &ctx);
168 let ctx2 = Context::current_with_span(span);
169 let host_port: Vec<&str> = request_params.host.split(':').collect();
172 debug!("Forwarding query to FCGI process {fcgino}: {fcgi_query}");
173 let len = format!("{}", body.len());
174 let mut params = fastcgi_client::Params::new()
175 .set_request_method(req_method)
176 .set_request_uri(req_path)
177 .set_server_name(host_port.first().unwrap_or(&""))
178 .set_query_string(fcgi_query)
179 .set_content_length(&len);
180 if let Some(port) = host_port.get(1) {
181 params = params.set_server_port(port);
182 }
183 if request_params.scheme == "https" {
184 params.insert("HTTPS", "ON");
185 }
186 let fcgi_start = SystemTime::now();
189 let body = body.as_bytes();
190 let output = fcgi_client.do_request(¶ms, &mut &body[..]);
191 if let Err(ref e) = output {
192 warn!("FCGI error: {e}");
193 fcgi_dispatcher.remove(fcgi_client);
195 return Err(FcgiError::FcgiRequestError);
196 }
197 let fcgiout = output.unwrap().get_stdout().unwrap();
198
199 let mut cursor = Cursor::new(fcgiout);
200 let mut response = TileResponse::new();
202 let mut line = String::new();
203 while let Ok(_bytes) = cursor.read_line(&mut line) {
204 let len = line.trim_end_matches(&['\r', '\n'][..]).len();
206 line.truncate(len);
207 if len == 0 {
208 break;
209 }
210 let parts: Vec<&str> = line.splitn(2, ": ").collect();
211 if parts.len() != 2 {
213 warn!("Invalid FCGI-Header received: {line}");
214 break;
215 }
216 let (key, value) = (parts[0], parts[1]);
217 match key {
218 "Content-Type" => {
219 response.set_content_type(value.to_string());
220 }
221 "Content-Length" | "Server" => {} "X-us" => {
223 let us: u64 = value.parse().expect("u64 value");
225 let _span = tracer
226 .span_builder("fcgi")
227 .with_kind(SpanKind::Internal)
228 .with_start_time(fcgi_start)
229 .with_end_time(fcgi_start + Duration::from_micros(us))
230 .start_with_context(&tracer, &ctx2);
231 response.insert_header(("Server-Timing", format!("wms-backend;dur={}", us / 1000)));
235 }
236 "X-trace" => { }
238 "X-metrics" => {
239 for entry in value.split(',') {
241 let keyval: Vec<&str> = entry.splitn(2, ':').collect();
242 match keyval[0] {
243 "cache_count" => metrics.fcgi_cache_count[fcgino]
244 .with_label_values(&[fcgi_dispatcher.backend_name()])
245 .set(i64::from_str(keyval[1]).expect("i64 value")),
246 "cache_hit" => metrics.fcgi_cache_hit[fcgino]
247 .with_label_values(&[fcgi_dispatcher.backend_name()])
248 .set(i64::from_str(keyval[1]).expect("i64 value")),
249 "cache_miss" => { }
250 _ => debug!("Ignoring metric entry {entry}"),
251 }
252 }
253 }
254 _ => debug!("Ignoring FCGI-Header: {line}"),
255 }
256 line.truncate(0);
257 }
258
259 drop(ctx2);
261 Ok(response.with_body(Box::new(cursor)))
264}
265
266impl ServiceEndpoints for MapService {
267 fn register_endpoints(&self, cfg: &mut web::ServiceConfig) {
268 cfg.app_data(web::Data::new(self.metrics().clone()));
269
270 cfg.app_data(web::Data::new(self.inventory.clone()));
271
272 for fcgi_client in &self.fcgi_clients {
273 for suffix_info in &fcgi_client.suffixes {
274 let route = suffix_info.url_base.trim_end_matches('/').to_string();
275 let suffix = suffix_info.suffix.clone();
276 info!("Registering WMS endpoint {route}/ (suffix: {suffix})");
277 cfg.service(
278 web::resource(route + "/{project:.+}") .app_data(fcgi_client.clone())
280 .app_data(web::Data::new(suffix))
281 .route(
282 web::route()
283 .guard(guard::Any(guard::Get()).or(guard::Post()))
284 .to(wms_fcgi),
285 ),
286 );
287 }
288 }
289 }
290}