1use crate::app::Config;
2use crate::tracing;
3use crate::session_manager::RustBasicSessionStore;
4use crate::router::{Router, Response};
5use crate::requests::Request;
6use std::net::SocketAddr;
7use crate::sql::AnyPool;
8use std::sync::Arc;
9use std::convert::Infallible;
10use tokio::net::TcpListener;
11use hyper::service::service_fn;
12use hyper_util::rt::TokioIo;
13use hyper::server::conn::http1;
14use crate::rand::distr::SampleString;
15#[cfg(feature = "websocket")]
16use futures_util::{SinkExt, StreamExt};
17#[cfg(feature = "websocket")]
18use tokio_tungstenite::tungstenite::Message;
19
20#[derive(Clone)]
21pub struct AppState {
22 pub db: AnyPool,
23 pub config: Arc<Config>,
24}
25
26static EMBEDDED_PUBLIC_GET: std::sync::OnceLock<fn(&str) -> Option<crate::rust_embed::EmbeddedFile>> = std::sync::OnceLock::new();
27
28pub fn set_embedded_public(f: fn(&str) -> Option<crate::rust_embed::EmbeddedFile>) {
29 EMBEDDED_PUBLIC_GET.set(f).ok();
30}
31
32fn guess_mime(path: &str) -> &'static str {
33 if path.ends_with(".js") {
34 "application/javascript"
35 } else if path.ends_with(".css") {
36 "text/css"
37 } else if path.ends_with(".html") {
38 "text/html"
39 } else if path.ends_with(".png") {
40 "image/png"
41 } else if path.ends_with(".jpg") || path.ends_with(".jpeg") {
42 "image/jpeg"
43 } else if path.ends_with(".svg") {
44 "image/svg+xml"
45 } else if path.ends_with(".ico") {
46 "image/x-icon"
47 } else if path.ends_with(".json") {
48 "application/json"
49 } else if path.ends_with(".woff") {
50 "font/woff"
51 } else if path.ends_with(".woff2") {
52 "font/woff2"
53 } else {
54 "application/octet-stream"
55 }
56}
57
58pub async fn start_server(
59 cfg: Config,
60 session_store: RustBasicSessionStore,
61 db: AnyPool,
62 app_router: Router<AppState>,
63) {
64 let mut routes_map = std::collections::HashMap::new();
66 for r in &app_router.routes {
67 if let Some(ref name) = r.name {
68 routes_map.insert(name.clone(), r.path.clone());
69 }
70 }
71 let _ = crate::router::NAMED_ROUTES.set(routes_map);
72
73 kill_port_if_in_use(cfg.app_port);
75
76 unsafe {
78 std::env::set_var("TZ", &cfg.app_timezone);
79 }
80
81 let state = AppState {
83 db,
84 config: Arc::new(cfg.clone()),
85 };
86
87 let addr_str = format!("{}:{}", cfg.app_host, cfg.app_port);
89 let addr: SocketAddr = addr_str.parse().expect("Alamat server tidak valid");
90
91 tracing::info!("{} berjalan di: http://{}", cfg.app_name, addr);
92 tracing::info!("WebSockets enabled: {}", cfg.websocket_enabled);
93
94 let listener = TcpListener::bind(addr).await.unwrap();
96
97 loop {
98 let (stream, peer_addr) = match listener.accept().await {
99 Ok(ok) => ok,
100 Err(_) => continue,
101 };
102
103 let _ = stream.set_nodelay(true);
105
106 let io = TokioIo::new(stream);
107 let state = state.clone();
108 let router = app_router.clone();
109 let peer_ip = peer_addr.ip().to_string();
110 let session_store = session_store.clone();
111
112 tokio::task::spawn(async move {
113 let service = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
114 let state = state.clone();
115 let router = router.clone();
116 let peer_ip = peer_ip.clone();
117 let session_store = session_store.clone();
118 async move {
119 let res = handle_http_request(req, peer_ip, state, router, session_store).await;
120 Ok::<_, Infallible>(res)
121 }
122 });
123
124 if let Err(err) = http1::Builder::new()
125 .serve_connection(io, service)
126 .with_upgrades()
127 .await
128 {
129 tracing::debug!("Error serving connection: {:?}", err);
130 }
131 });
132 }
133}
134
135pub(crate) fn match_path(route_path: &str, req_path: &str) -> bool {
136 let r_parts: Vec<&str> = route_path.split('/').filter(|s| !s.is_empty()).collect();
137 let q_parts: Vec<&str> = req_path.split('/').filter(|s| !s.is_empty()).collect();
138
139 if r_parts.len() != q_parts.len() {
140 return false;
141 }
142
143 for (r, q) in r_parts.iter().zip(q_parts.iter()) {
144 if r.starts_with(':') || (r.starts_with('{') && r.ends_with('}')) {
145 continue;
146 }
147 if r != q {
148 return false;
149 }
150 }
151 true
152}
153
154pub(crate) fn extract_params(route_path: &str, req_path: &str) -> std::collections::HashMap<String, String> {
157 let mut params = std::collections::HashMap::new();
158 let r_parts: Vec<&str> = route_path.split('/').filter(|s| !s.is_empty()).collect();
159 let q_parts: Vec<&str> = req_path.split('/').filter(|s| !s.is_empty()).collect();
160
161 for (r, q) in r_parts.iter().zip(q_parts.iter()) {
162 if r.starts_with('{') && r.ends_with('}') {
163 let key = &r[1..r.len() - 1];
165 params.insert(key.to_string(), q.to_string());
166 } else if r.starts_with(':') {
167 let key = &r[1..];
169 params.insert(key.to_string(), q.to_string());
170 }
171 }
172 params
173}
174
175async fn serve_static_or_404(path: &str, state: &AppState) -> Response {
176 let clean_path = path.trim_start_matches('/');
177 let file_path = if clean_path.is_empty() { "index.html" } else { clean_path };
178
179 if state.config.app_debug {
180 let disk_path = std::path::Path::new("public").join(file_path);
181 if disk_path.exists() && disk_path.is_file() {
182 if let Ok(content) = std::fs::read(&disk_path) {
183 let mime = guess_mime(file_path);
184 return http::Response::builder()
185 .header(http::header::CONTENT_TYPE, mime)
186 .body(content)
187 .unwrap();
188 }
189 }
190 } else {
191 if let Some(file) = EMBEDDED_PUBLIC_GET.get().and_then(|f| f(file_path)) {
192 let mime = guess_mime(file_path);
193 return http::Response::builder()
194 .header(http::header::CONTENT_TYPE, mime)
195 .body(file.data.to_vec())
196 .unwrap();
197 }
198 }
199
200 crate::errors::ErrorController::not_found().await
201}
202
203async fn handle_http_request(
204 #[allow(unused_mut)] mut hyper_req: hyper::Request<hyper::body::Incoming>,
205 peer_ip: String,
206 state: AppState,
207 router: Router<AppState>,
208 session_store: RustBasicSessionStore,
209) -> hyper::Response<http_body_util::Full<hyper::body::Bytes>> {
210 use http_body_util::BodyExt;
211
212 let path_str = hyper_req.uri().path().to_string();
214 if path_str == "/ws" {
215 #[cfg(feature = "websocket")]
216 {
217 if !state.config.websocket_enabled {
218 return hyper::Response::builder()
219 .status(http::StatusCode::NOT_FOUND)
220 .body(http_body_util::Full::new(hyper::body::Bytes::from("WebSockets are disabled")))
221 .unwrap();
222 }
223
224 if hyper_tungstenite::is_upgrade_request(&hyper_req) {
225 match hyper_tungstenite::upgrade(&mut hyper_req, None) {
226 Ok((response, websocket)) => {
227 tokio::spawn(async move {
228 handle_websocket_connection(websocket).await;
229 });
230 let (parts, _) = response.into_parts();
231 return hyper::Response::from_parts(parts, http_body_util::Full::new(hyper::body::Bytes::new()));
232 }
233 Err(e) => {
234 tracing::error!("Gagal mengupgrade koneksi WebSocket: {:?}", e);
235 }
236 }
237 }
238 }
239 #[cfg(not(feature = "websocket"))]
240 {
241 return hyper::Response::builder()
242 .status(http::StatusCode::NOT_FOUND)
243 .body(http_body_util::Full::new(hyper::body::Bytes::from("WebSockets feature not compiled")))
244 .unwrap();
245 }
246 }
247
248 let (parts, body) = hyper_req.into_parts();
249 let method = parts.method.clone();
250 let uri = parts.uri.clone();
251 let path = uri.path().to_string();
252
253 let mut headers = std::collections::HashMap::new();
254 for (name, val) in parts.headers.iter() {
255 if let Ok(val_str) = val.to_str() {
256 headers.insert(name.as_str().to_lowercase(), val_str.to_string());
257 }
258 }
259
260 let mut inputs = serde_json::json!({});
261 if let Some(query) = uri.query() {
262 if let Ok(params) = crate::serde_urlencoded::from_str::<std::collections::HashMap<String, String>>(query) {
263 for (k, v) in params {
264 inputs[k] = serde_json::json!(v);
265 }
266 }
267 }
268
269 let body_bytes = body.collect().await.map(|c| c.to_bytes()).unwrap_or_default();
270 let content_type = headers.get("content-type").map(|s| s.as_str()).unwrap_or("");
271 if content_type.starts_with("application/json") {
272 if let Ok(json_val) = serde_json::from_slice::<serde_json::Value>(&body_bytes) {
273 if let serde_json::Value::Object(obj) = json_val {
274 for (k, v) in obj {
275 inputs[k] = v;
276 }
277 }
278 }
279 } else if content_type.starts_with("application/x-www-form-urlencoded") {
280 if let Ok(params) = crate::serde_urlencoded::from_bytes::<std::collections::HashMap<String, String>>(&body_bytes) {
281 for (k, v) in params {
282 inputs[k] = serde_json::json!(v);
283 }
284 }
285 }
286
287 let mut session_id = None;
288 if let Some(cookie_header) = headers.get("cookie") {
289 for cookie in cookie_header.split(';') {
290 let parts: Vec<&str> = cookie.split('=').map(|s| s.trim()).collect();
291 if parts.len() == 2 && parts[0] == "rustbasic_session" {
292 session_id = Some(parts[1].to_string());
293 break;
294 }
295 }
296 }
297
298 let id = session_id.unwrap_or_else(|| {
299 crate::rand::distr::Alphanumeric.sample_string(&mut crate::rand::rng(), 40)
300 });
301
302 let session_data = if let Some(payload_str) = session_store.load(&id).await {
303 serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(&payload_str).unwrap_or_default()
304 } else {
305 serde_json::Map::new()
306 };
307
308 let session = crate::session::Session::new(id.clone());
309 *session.data.lock().unwrap() = session_data;
310
311 if session.get::<String>("_token").is_none() {
312 let new_token = crate::rand::distr::Alphanumeric.sample_string(&mut crate::rand::rng(), 40);
313 session.set("_token", new_token);
314 }
315
316 let req = Request {
317 inputs,
318 method: method.clone(),
319 path: path.clone(),
320 headers,
321 session: session.clone(),
322 state: state.clone(),
323 ip_address: peer_ip,
324 params: std::collections::HashMap::new(), };
326
327 struct RouteDispatcher {
328 router: Router<AppState>,
329 state: AppState,
330 }
331
332 #[crate::async_trait]
333 impl crate::router::ErasedHandler for RouteDispatcher {
334 async fn call(&self, req: Request) -> Response {
335 let method = req.method.clone();
336 let path = req.path.clone();
337
338 let mut matched_handler = None;
339 let mut matched_params = std::collections::HashMap::new();
340 for route in &self.router.routes {
341 if match_path(&route.path, &path) {
342 for (m, h) in &route.handlers {
343 if m == &method {
344 matched_handler = Some(h.clone());
345 matched_params = extract_params(&route.path, &path);
346 break;
347 }
348 }
349 }
350 if matched_handler.is_some() {
351 break;
352 }
353 }
354
355 if let Some(handler) = matched_handler {
356 let mut req = req;
358 req.params = matched_params;
359 let mut chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::End(handler));
360 for mw in self.router.middlewares.iter().rev() {
361 chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(mw.clone(), chain));
362 }
363 chain.next(req).await
364 } else {
365 serve_static_or_404(&path, &self.state).await
366 }
367 }
368 }
369
370 let dispatcher = std::sync::Arc::new(RouteDispatcher {
371 router,
372 state: state.clone(),
373 });
374
375 let mut chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::End(dispatcher));
376 chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(
377 crate::middleware::from_fn(crate::middleware::security_headers::security_headers_middleware),
378 chain,
379 ));
380 chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(
381 crate::middleware::from_fn(crate::middleware::logging::logging_middleware),
382 chain,
383 ));
384
385 let ip = req.ip_address.clone();
386 let res = chain.next(req).await;
387
388 let final_session_data = session.data.lock().unwrap().clone();
389 if let Ok(session_json) = serde_json::to_string(&final_session_data) {
390 session_store.store(&id, &session_json, &ip).await;
391 }
392
393 let (mut res_parts, res_body) = res.into_parts();
394 let cookie_val = format!("rustbasic_session={}; Path=/; HttpOnly; SameSite=Lax", id);
395 res_parts.headers.insert(
396 http::header::SET_COOKIE,
397 http::HeaderValue::from_str(&cookie_val).unwrap(),
398 );
399
400 hyper::Response::from_parts(res_parts, http_body_util::Full::new(hyper::body::Bytes::from(res_body)))
401}
402
403fn kill_port_if_in_use(_port: u16) {
404 #[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
405 use std::process::Command;
406 #[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
407 let port = _port;
408 #[cfg(target_os = "macos")]
409 {
410 let output = Command::new("lsof")
411 .arg("-t")
412 .arg(format!("-i:{}", port))
413 .output();
414
415 if let Ok(out) = output {
416 let pid_str = String::from_utf8_lossy(&out.stdout).trim().to_string();
417 if !pid_str.is_empty() {
418 tracing::warn!("Port {} sedang digunakan oleh PID {}. Membunuh proses...", port, pid_str);
419
420 for pid in pid_str.split('\n') {
421 if !pid.is_empty() {
422 let _ = Command::new("kill")
423 .arg("-9")
424 .arg(pid)
425 .output();
426 }
427 }
428
429 std::thread::sleep(std::time::Duration::from_millis(500));
430 }
431 }
432 }
433
434 #[cfg(target_os = "linux")]
435 {
436 let _ = Command::new("fuser")
437 .arg("-k")
438 .arg(format!("{}/tcp", port))
439 .output();
440 }
441
442 #[cfg(target_os = "windows")]
443 {
444 let output = Command::new("cmd")
445 .args(&["/C", &format!("netstat -ano | findstr :{}", port)])
446 .output();
447
448 if let Ok(out) = output {
449 let stdout = String::from_utf8_lossy(&out.stdout);
450 let mut found = false;
451 for line in stdout.lines() {
452 let parts: Vec<&str> = line.split_whitespace().collect();
453 if let Some(pid) = parts.last() {
454 if pid.parse::<u32>().is_ok() {
455 tracing::warn!("Port {} sedang digunakan oleh PID {}. Membunuh proses...", port, pid);
456 let _ = Command::new("taskkill")
457 .args(&["/F", "/PID", pid])
458 .output();
459 found = true;
460 }
461 }
462 }
463 if found {
464 std::thread::sleep(std::time::Duration::from_millis(500));
465 }
466 }
467 }
468}
469
470#[cfg(feature = "websocket")]
471async fn handle_websocket_connection(ws_stream: hyper_tungstenite::HyperWebsocket) {
472 let mut ws = match ws_stream.await {
473 Ok(w) => w,
474 Err(e) => {
475 crate::tracing::error!("WebSocket handshake failed: {:?}", e);
476 return;
477 }
478 };
479
480 let state = crate::support::broadcaster::Broadcaster::state();
481 let conn_id = state.next_conn_id();
482 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<String>();
483 let mut subscribed_channels = Vec::new();
484
485 loop {
486 tokio::select! {
487 incoming = ws.next() => {
488 let msg = match incoming {
489 Some(Ok(m)) => m,
490 Some(Err(_)) | None => break,
491 };
492
493 if msg.is_text() {
494 let text = msg.to_text().unwrap_or("");
495 if let Ok(val) = serde_json::from_str::<serde_json::Value>(text) {
496 if let Some(action) = val.get("action").and_then(|a| a.as_str()) {
497 if let Some(channel) = val.get("channel").and_then(|c| c.as_str()) {
498 match action {
499 "subscribe" => {
500 let session = crate::support::broadcaster::ClientSession {
501 id: conn_id,
502 tx: tx.clone(),
503 };
504 state.subscribe(channel, session).await;
505 subscribed_channels.push(channel.to_string());
506 }
507 "unsubscribe" => {
508 state.unsubscribe(channel, conn_id).await;
509 subscribed_channels.retain(|c| c != channel);
510 }
511 "broadcast" => {
512 if let Some(event) = val.get("event").and_then(|e| e.as_str()) {
513 if let Some(data) = val.get("data") {
514 let msg = serde_json::json!({
515 "event": event,
516 "channel": channel,
517 "data": data
518 });
519 if let Ok(msg_str) = serde_json::to_string(&msg) {
520 let channels = state.channels.read().await;
521 if let Some(sessions) = channels.get(channel) {
522 for session in sessions {
523 if session.id != conn_id {
524 let _ = session.tx.send(msg_str.clone());
525 }
526 }
527 }
528 }
529 }
530 }
531 }
532 _ => {}
533 }
534 }
535 }
536 }
537 } else if msg.is_close() {
538 break;
539 }
540 }
541 outgoing = rx.recv() => {
542 let text = match outgoing {
543 Some(t) => t,
544 None => break,
545 };
546 if ws.send(Message::Text(text.into())).await.is_err() {
547 break;
548 }
549 }
550 }
551 }
552
553 for channel in subscribed_channels {
554 state.unsubscribe(&channel, conn_id).await;
555 }
556}