1use crate::cache::Cache;
2use crate::config::{Config, ServerConfig};
3use crate::container::App;
4use crate::http::{HttpResponse, Request};
5use crate::middleware::{Middleware, MiddlewareChain, MiddlewareRegistry};
6use crate::routing::Router;
7use crate::websocket::handle_ws_upgrade;
8use bytes::Bytes;
9use http_body_util::Full;
10use hyper::server::conn::http1;
11use hyper::service::service_fn;
12use hyper_util::rt::TokioIo;
13use std::convert::Infallible;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use tokio::net::TcpListener;
17
18type WsInterceptor = Box<
24 dyn Fn(
25 hyper::Request<hyper::body::Incoming>,
26 ) -> Result<hyper::Response<Full<Bytes>>, hyper::Request<hyper::body::Incoming>>
27 + Send
28 + Sync,
29>;
30
31pub struct Server {
33 router: Arc<Router>,
34 middleware: MiddlewareRegistry,
35 host: String,
36 port: u16,
37 ws_interceptor: Option<Arc<WsInterceptor>>,
38}
39
40impl Server {
41 pub fn new(router: impl Into<Router>) -> Self {
43 Self {
44 router: Arc::new(router.into()),
45 middleware: MiddlewareRegistry::new(),
46 host: "127.0.0.1".to_string(),
47 port: 8080,
48 ws_interceptor: None,
49 }
50 }
51
52 pub fn from_config(router: impl Into<Router>) -> Self {
54 App::init();
56
57 App::boot_services();
59
60 let config = Config::get::<ServerConfig>().unwrap_or_else(ServerConfig::from_env);
61 Self {
62 router: Arc::new(router.into()),
63 middleware: MiddlewareRegistry::from_global(),
65 host: config.host,
66 port: config.port,
67 ws_interceptor: None,
68 }
69 }
70
71 pub fn ws_interceptor<F>(mut self, handler: F) -> Self
92 where
93 F: Fn(
94 hyper::Request<hyper::body::Incoming>,
95 )
96 -> Result<hyper::Response<Full<Bytes>>, hyper::Request<hyper::body::Incoming>>
97 + Send
98 + Sync
99 + 'static,
100 {
101 self.ws_interceptor = Some(Arc::new(Box::new(handler)));
102 self
103 }
104
105 pub fn middleware<M: Middleware + 'static>(mut self, middleware: M) -> Self {
119 self.middleware = self.middleware.append(middleware);
120 self
121 }
122
123 pub fn host(mut self, host: &str) -> Self {
125 self.host = host.to_string();
126 self
127 }
128
129 pub fn port(mut self, port: u16) -> Self {
131 self.port = port;
132 self
133 }
134
135 fn get_addr(&self) -> SocketAddr {
136 SocketAddr::new(self.host.parse().unwrap(), self.port)
137 }
138
139 pub async fn run(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
141 Cache::bootstrap().await;
143
144 let addr: SocketAddr = self.get_addr();
145 let listener = TcpListener::bind(addr).await?;
146
147 println!("Ferro server running on http://{addr}");
148
149 let router = self.router;
150 let middleware = Arc::new(self.middleware);
151 let ws_interceptor = self.ws_interceptor;
152
153 loop {
154 let (stream, _) = listener.accept().await?;
155 let io = TokioIo::new(stream);
156 let router = router.clone();
157 let middleware = middleware.clone();
158 let ws_interceptor = ws_interceptor.clone();
159
160 tokio::spawn(async move {
161 let service = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
162 let router = router.clone();
163 let middleware = middleware.clone();
164 let ws_interceptor = ws_interceptor.clone();
165 async move {
166 Ok::<_, Infallible>(
167 handle_request(router, middleware, ws_interceptor, req).await,
168 )
169 }
170 });
171
172 if let Err(err) = http1::Builder::new()
173 .serve_connection(io, service)
174 .with_upgrades()
175 .await
176 {
177 eprintln!("Error serving connection: {err:?}");
178 }
179 });
180 }
181 }
182}
183
184async fn handle_request(
185 router: Arc<Router>,
186 middleware_registry: Arc<MiddlewareRegistry>,
187 ws_interceptor: Option<Arc<WsInterceptor>>,
188 mut req: hyper::Request<hyper::body::Incoming>,
189) -> hyper::Response<Full<Bytes>> {
190 if let Some(ref interceptor) = ws_interceptor {
192 if hyper_tungstenite::is_upgrade_request(&req) {
193 match interceptor(req) {
194 Ok(response) => return response,
195 Err(returned_req) => {
196 req = returned_req;
198 }
199 }
200 }
201 }
202
203 let method = req.method().clone();
204 let path = req.uri().path().to_string();
205 let query = req.uri().query().unwrap_or("");
206
207 if path == "/_ferro/ws" && hyper_tungstenite::is_upgrade_request(&req) {
209 return handle_ws_upgrade(req);
210 }
211
212 if path.starts_with("/_ferro/") && method == hyper::Method::GET {
215 return match path.as_str() {
216 "/_ferro/health" => health_response(query).await,
217 "/_ferro/routes" => crate::debug::handle_routes(),
218 "/_ferro/middleware" => crate::debug::handle_middleware(),
219 "/_ferro/services" => crate::debug::handle_services(),
220 "/_ferro/metrics" => crate::debug::handle_metrics(),
221 "/_ferro/queue/jobs" => crate::debug::handle_queue_jobs().await,
222 "/_ferro/queue/stats" => crate::debug::handle_queue_stats().await,
223 "/_ferro/ferro-base.css" => {
224 #[cfg(feature = "json-ui")]
225 {
226 serve_ferro_base_css()
227 }
228 #[cfg(not(feature = "json-ui"))]
229 {
230 HttpResponse::text("404 Not Found").status(404).into_hyper()
231 }
232 }
233 _ => HttpResponse::text("404 Not Found").status(404).into_hyper(),
234 };
235 }
236
237 let pre_route = crate::middleware::get_pre_route_middleware();
239 for hook in &pre_route {
240 match hook.handle(req).await {
241 Ok(rewritten) => req = rewritten,
242 Err(response) => return response,
243 }
244 }
245
246 let method = req.method().clone();
247 let path = req.uri().path().to_string();
248
249 let ferro_request = Request::new(req);
250 let routing_path = path.clone();
251
252 let request_host = ferro_request
254 .header("host")
255 .unwrap_or_default()
256 .split(':')
257 .next()
258 .unwrap_or("")
259 .to_ascii_lowercase();
260
261 let response = match router.match_route(&method, &routing_path) {
262 Some((handler, params, route_pattern)) => {
263 let request = ferro_request
264 .with_params(params)
265 .with_route_pattern(route_pattern.clone());
266
267 let mut chain = MiddlewareChain::new();
269
270 chain.extend(middleware_registry.global_middleware().iter().cloned());
272
273 let route_middleware = router.get_route_middleware(&route_pattern);
275 chain.extend(route_middleware);
276
277 let response = crate::http::request_context::REQUEST_HOST
279 .scope(request_host, chain.execute(request, handler))
280 .await;
281
282 let http_response = response.unwrap_or_else(|e| e);
284 http_response.into_hyper()
285 }
286 None => {
287 if method == hyper::Method::GET || method == hyper::Method::HEAD {
289 if let Some(response) =
290 crate::static_files::try_serve_static_file(&routing_path).await
291 {
292 return response;
293 }
294 }
295
296 if let Some((fallback_handler, fallback_middleware)) = router.get_fallback() {
298 let request = ferro_request.with_params(std::collections::HashMap::new());
299
300 let mut chain = MiddlewareChain::new();
302
303 chain.extend(middleware_registry.global_middleware().iter().cloned());
305
306 chain.extend(fallback_middleware);
308
309 let response = chain.execute(request, fallback_handler).await;
311
312 let http_response = response.unwrap_or_else(|e| e);
314 http_response.into_hyper()
315 } else {
316 HttpResponse::text("404 Not Found").status(404).into_hyper()
318 }
319 }
320 };
321
322 response
323}
324
325async fn health_response(query: &str) -> hyper::Response<Full<Bytes>> {
329 use chrono::Utc;
330 use serde_json::json;
331
332 let timestamp = Utc::now().to_rfc3339();
333 let check_db = query.contains("db=true");
334
335 let mut response = json!({
336 "status": "ok",
337 "timestamp": timestamp
338 });
339
340 if check_db {
341 match check_database_health().await {
343 Ok(_) => {
344 response["database"] = json!("connected");
345 }
346 Err(e) => {
347 response["database"] = json!("error");
348 response["database_error"] = json!(e);
349 }
350 }
351 }
352
353 let body =
354 serde_json::to_string(&response).unwrap_or_else(|_| r#"{"status":"ok"}"#.to_string());
355
356 hyper::Response::builder()
357 .status(200)
358 .header("Content-Type", "application/json")
359 .body(Full::new(Bytes::from(body)))
360 .unwrap()
361}
362
363#[cfg(feature = "json-ui")]
369fn serve_ferro_base_css() -> hyper::Response<Full<Bytes>> {
370 let css = ferro_json_ui::FERRO_BASE_CSS;
371 hyper::Response::builder()
372 .status(200)
373 .header("Content-Type", "text/css; charset=utf-8")
374 .header("Content-Length", css.len().to_string())
375 .header("Cache-Control", "public, max-age=31536000, immutable")
376 .body(Full::new(Bytes::from_static(css.as_bytes())))
377 .unwrap()
378}
379
380async fn check_database_health() -> Result<(), String> {
382 use crate::database::DB;
383 use sea_orm::ConnectionTrait;
384
385 if !DB::is_connected() {
386 return Err("Database not initialized".to_string());
387 }
388
389 let conn = DB::connection().map_err(|e| e.to_string())?;
390
391 conn.inner()
393 .execute_unprepared("SELECT 1")
394 .await
395 .map_err(|e| format!("Database query failed: {e}"))?;
396
397 Ok(())
398}
399
400#[cfg(all(test, feature = "json-ui"))]
401mod ferro_base_css_route_tests {
402 use super::*;
403 use http_body_util::BodyExt;
404
405 #[tokio::test]
406 async fn serve_ferro_base_css_returns_200_with_text_css_content_type() {
407 let response = serve_ferro_base_css();
408
409 assert_eq!(response.status(), 200, "expected 200 OK");
410
411 let ct = response
412 .headers()
413 .get("Content-Type")
414 .expect("Content-Type header missing")
415 .to_str()
416 .unwrap();
417 assert_eq!(ct, "text/css; charset=utf-8");
418
419 let cc = response
420 .headers()
421 .get("Cache-Control")
422 .expect("Cache-Control header missing")
423 .to_str()
424 .unwrap();
425 assert_eq!(cc, "public, max-age=31536000, immutable");
426
427 let cl = response
428 .headers()
429 .get("Content-Length")
430 .expect("Content-Length header missing")
431 .to_str()
432 .unwrap()
433 .parse::<usize>()
434 .expect("Content-Length must be an integer");
435 assert_eq!(cl, ferro_json_ui::FERRO_BASE_CSS.len());
436 }
437
438 #[tokio::test]
439 async fn serve_ferro_base_css_body_equals_embedded_constant() {
440 let response = serve_ferro_base_css();
441 let body_bytes = response
442 .into_body()
443 .collect()
444 .await
445 .expect("body collect")
446 .to_bytes();
447 assert_eq!(
448 body_bytes.as_ref(),
449 ferro_json_ui::FERRO_BASE_CSS.as_bytes()
450 );
451 assert!(!body_bytes.is_empty());
452 }
453}