1use crate::app::Config;
2use crate::tracing;
3use crate::session_manager::RustBasicSessionStore;
4use crate::router::{Router, Response};
5use crate::requests::Request;
6use std::net::SocketAddr;
7use crate::sql::AnyPool;
8use std::sync::Arc;
9use std::convert::Infallible;
10use tokio::net::TcpListener;
11use hyper::service::service_fn;
12use hyper_util::rt::TokioIo;
13use hyper::server::conn::http1;
14use crate::rand::distr::SampleString;
15#[cfg(feature = "websocket")]
16use futures_util::{SinkExt, StreamExt};
17#[cfg(feature = "websocket")]
18use tokio_tungstenite::tungstenite::Message;
19
20#[derive(Clone)]
21pub struct AppState {
22 pub db: AnyPool,
23 pub config: Arc<Config>,
24}
25
26static EMBEDDED_PUBLIC_GET: std::sync::OnceLock<fn(&str) -> Option<crate::rust_embed::EmbeddedFile>> = std::sync::OnceLock::new();
27
28pub fn set_embedded_public(f: fn(&str) -> Option<crate::rust_embed::EmbeddedFile>) {
29 EMBEDDED_PUBLIC_GET.set(f).ok();
30}
31
32pub fn get_embedded_public_fn() -> Option<fn(&str) -> Option<crate::rust_embed::EmbeddedFile>> {
33 EMBEDDED_PUBLIC_GET.get().copied()
34}
35
36fn guess_mime(path: &str) -> &'static str {
37 if path.ends_with(".js") {
38 "application/javascript"
39 } else if path.ends_with(".css") {
40 "text/css"
41 } else if path.ends_with(".html") {
42 "text/html"
43 } else if path.ends_with(".png") {
44 "image/png"
45 } else if path.ends_with(".jpg") || path.ends_with(".jpeg") {
46 "image/jpeg"
47 } else if path.ends_with(".svg") {
48 "image/svg+xml"
49 } else if path.ends_with(".ico") {
50 "image/x-icon"
51 } else if path.ends_with(".json") {
52 "application/json"
53 } else if path.ends_with(".woff") {
54 "font/woff"
55 } else if path.ends_with(".woff2") {
56 "font/woff2"
57 } else {
58 "application/octet-stream"
59 }
60}
61
62pub async fn start_server(
63 cfg: Config,
64 session_store: RustBasicSessionStore,
65 db: AnyPool,
66 app_router: Router<AppState>,
67) {
68 let mut routes_map = std::collections::HashMap::new();
70 for r in &app_router.routes {
71 if let Some(ref name) = r.name {
72 routes_map.insert(name.clone(), r.path.clone());
73 }
74 }
75 let _ = crate::router::NAMED_ROUTES.set(routes_map);
76
77 kill_port_if_in_use(cfg.app_port);
79
80 unsafe {
82 std::env::set_var("TZ", &cfg.app_timezone);
83 }
84
85 let state = AppState {
87 db,
88 config: Arc::new(cfg.clone()),
89 };
90
91 let addr_str = format!("{}:{}", cfg.app_host, cfg.app_port);
93 let addr: SocketAddr = addr_str.parse().expect("Alamat server tidak valid");
94
95 tracing::info!("{} berjalan di: http://{}", cfg.app_name, addr);
96 tracing::info!("WebSockets enabled: {}", cfg.websocket_enabled);
97
98 let listener = TcpListener::bind(addr).await.unwrap();
100
101 loop {
102 let (stream, peer_addr) = match listener.accept().await {
103 Ok(ok) => ok,
104 Err(_) => continue,
105 };
106
107 let _ = stream.set_nodelay(true);
109
110 let io = TokioIo::new(stream);
111 let state = state.clone();
112 let router = app_router.clone();
113 let peer_ip = peer_addr.ip().to_string();
114 let session_store = session_store.clone();
115
116 tokio::task::spawn(async move {
117 let service = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
118 let state = state.clone();
119 let router = router.clone();
120 let peer_ip = peer_ip.clone();
121 let session_store = session_store.clone();
122 async move {
123 let res = handle_http_request(req, peer_ip, state, router, session_store).await;
124 Ok::<_, Infallible>(res)
125 }
126 });
127
128 if let Err(err) = http1::Builder::new()
129 .serve_connection(io, service)
130 .with_upgrades()
131 .await
132 {
133 tracing::debug!("Error serving connection: {:?}", err);
134 }
135 });
136 }
137}
138
139pub(crate) fn match_path(route_path: &str, req_path: &str) -> bool {
140 let r_parts: Vec<&str> = route_path.split('/').filter(|s| !s.is_empty()).collect();
141 let q_parts: Vec<&str> = req_path.split('/').filter(|s| !s.is_empty()).collect();
142
143 if r_parts.len() != q_parts.len() {
144 return false;
145 }
146
147 for (r, q) in r_parts.iter().zip(q_parts.iter()) {
148 if r.starts_with(':') || (r.starts_with('{') && r.ends_with('}')) {
149 continue;
150 }
151 if r != q {
152 return false;
153 }
154 }
155 true
156}
157
158pub(crate) fn extract_params(route_path: &str, req_path: &str) -> std::collections::HashMap<String, String> {
161 let mut params = std::collections::HashMap::new();
162 let r_parts: Vec<&str> = route_path.split('/').filter(|s| !s.is_empty()).collect();
163 let q_parts: Vec<&str> = req_path.split('/').filter(|s| !s.is_empty()).collect();
164
165 for (r, q) in r_parts.iter().zip(q_parts.iter()) {
166 if r.starts_with('{') && r.ends_with('}') {
167 let key = &r[1..r.len() - 1];
169 params.insert(key.to_string(), q.to_string());
170 } else if let Some(key) = r.strip_prefix(':') {
171 params.insert(key.to_string(), q.to_string());
173 }
174 }
175 params
176}
177
178async fn serve_static_or_404(path: &str, state: &AppState) -> Response {
179 let clean_path = path.trim_start_matches('/');
180 let file_path = if clean_path.is_empty() { "index.html" } else { clean_path };
181
182 if state.config.app_debug {
183 let disk_path = std::path::Path::new("public").join(file_path);
184 if disk_path.exists() && disk_path.is_file()
185 && let Ok(content) = std::fs::read(&disk_path) {
186 let mime = guess_mime(file_path);
187 return http::Response::builder()
188 .header(http::header::CONTENT_TYPE, mime)
189 .body(content)
190 .unwrap();
191 }
192 } else {
193 if let Some(file) = EMBEDDED_PUBLIC_GET.get().and_then(|f| f(file_path)) {
194 let mime = guess_mime(file_path);
195 return http::Response::builder()
196 .header(http::header::CONTENT_TYPE, mime)
197 .body(file.data.to_vec())
198 .unwrap();
199 }
200 }
201
202 crate::errors::ErrorController::not_found().await
203}
204
205async fn handle_http_request(
206 #[allow(unused_mut)] mut hyper_req: hyper::Request<hyper::body::Incoming>,
207 peer_ip: String,
208 state: AppState,
209 router: Router<AppState>,
210 session_store: RustBasicSessionStore,
211) -> hyper::Response<http_body_util::Full<hyper::body::Bytes>> {
212 use http_body_util::BodyExt;
213
214 let path_str = hyper_req.uri().path().to_string();
216 if path_str == "/ws" {
217 #[cfg(feature = "websocket")]
218 {
219 if !state.config.websocket_enabled {
220 return hyper::Response::builder()
221 .status(http::StatusCode::NOT_FOUND)
222 .body(http_body_util::Full::new(hyper::body::Bytes::from("WebSockets are disabled")))
223 .unwrap();
224 }
225
226 if hyper_tungstenite::is_upgrade_request(&hyper_req) {
227 match hyper_tungstenite::upgrade(&mut hyper_req, None) {
228 Ok((response, websocket)) => {
229 tokio::spawn(async move {
230 handle_websocket_connection(websocket).await;
231 });
232 let (parts, _) = response.into_parts();
233 return hyper::Response::from_parts(parts, http_body_util::Full::new(hyper::body::Bytes::new()));
234 }
235 Err(e) => {
236 tracing::error!("Gagal mengupgrade koneksi WebSocket: {:?}", e);
237 }
238 }
239 }
240 }
241 #[cfg(not(feature = "websocket"))]
242 {
243 return hyper::Response::builder()
244 .status(http::StatusCode::NOT_FOUND)
245 .body(http_body_util::Full::new(hyper::body::Bytes::from("WebSockets feature not compiled")))
246 .unwrap();
247 }
248 }
249
250 let (parts, body) = hyper_req.into_parts();
251 let method = parts.method.clone();
252 let uri = parts.uri.clone();
253 let path = uri.path().to_string();
254
255 let mut headers = std::collections::HashMap::new();
256 for (name, val) in parts.headers.iter() {
257 if let Ok(val_str) = val.to_str() {
258 headers.insert(name.as_str().to_lowercase(), val_str.to_string());
259 }
260 }
261
262 let mut inputs = serde_json::json!({});
263 if let Some(query) = uri.query()
264 && let Ok(params) = crate::serde_urlencoded::from_str::<std::collections::HashMap<String, String>>(query) {
265 for (k, v) in params {
266 inputs[k] = serde_json::json!(v);
267 }
268 }
269
270 let body_bytes = body.collect().await.map(|c| c.to_bytes()).unwrap_or_default();
271 let content_type = headers.get("content-type").map(|s| s.as_str()).unwrap_or("");
272 if content_type.starts_with("application/json") {
273 if let Ok(json_val) = serde_json::from_slice::<serde_json::Value>(&body_bytes)
274 && let serde_json::Value::Object(obj) = json_val {
275 for (k, v) in obj {
276 inputs[k] = v;
277 }
278 }
279 } else if content_type.starts_with("application/x-www-form-urlencoded")
280 && let Ok(params) = crate::serde_urlencoded::from_bytes::<std::collections::HashMap<String, String>>(&body_bytes) {
281 for (k, v) in params {
282 inputs[k] = serde_json::json!(v);
283 }
284 }
285
286 let mut session_id = None;
287 if let Some(cookie_header) = headers.get("cookie") {
288 for cookie in cookie_header.split(';') {
289 let parts: Vec<&str> = cookie.split('=').map(|s| s.trim()).collect();
290 if parts.len() == 2 && parts[0] == "rustbasic_session" {
291 session_id = Some(parts[1].to_string());
292 break;
293 }
294 }
295 }
296
297 let id = session_id.unwrap_or_else(|| {
298 crate::rand::distr::Alphanumeric.sample_string(&mut crate::rand::rng(), 40)
299 });
300
301 let session_data = if let Some(payload_str) = session_store.load(&id).await {
302 serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(&payload_str).unwrap_or_default()
303 } else {
304 serde_json::Map::new()
305 };
306
307 let session = crate::session::Session::new(id.clone());
308 *session.data.lock().unwrap() = session_data;
309
310 if session.get::<String>("_token").is_none() {
311 let new_token = crate::rand::distr::Alphanumeric.sample_string(&mut crate::rand::rng(), 40);
312 session.set("_token", new_token);
313 }
314
315 let req = Request {
316 inputs,
317 method: method.clone(),
318 path: path.clone(),
319 headers,
320 session: session.clone(),
321 state: state.clone(),
322 ip_address: peer_ip,
323 params: std::collections::HashMap::new(), };
325
326 struct RouteDispatcher {
327 router: Router<AppState>,
328 state: AppState,
329 }
330
331 #[crate::async_trait]
332 impl crate::router::ErasedHandler for RouteDispatcher {
333 async fn call(&self, req: Request) -> Response {
334 let method = req.method.clone();
335 let path = req.path.clone();
336
337 let mut matched_handler = None;
338 let mut matched_params = std::collections::HashMap::new();
339 for route in &self.router.routes {
340 if match_path(&route.path, &path) {
341 for (m, h) in &route.handlers {
342 if m == method {
343 matched_handler = Some(h.clone());
344 matched_params = extract_params(&route.path, &path);
345 break;
346 }
347 }
348 }
349 if matched_handler.is_some() {
350 break;
351 }
352 }
353
354 if let Some(handler) = matched_handler {
355 let mut req = req;
357 req.params = matched_params;
358 let mut chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::End(handler));
359 for mw in self.router.middlewares.iter().rev() {
360 chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(mw.clone(), chain));
361 }
362 chain.next(req).await
363 } else {
364 serve_static_or_404(&path, &self.state).await
365 }
366 }
367 }
368
369 let dispatcher = std::sync::Arc::new(RouteDispatcher {
370 router,
371 state: state.clone(),
372 });
373
374 let mut chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::End(dispatcher));
375 chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(
376 crate::middleware::from_fn(crate::middleware::security_headers::security_headers_middleware),
377 chain,
378 ));
379 chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(
380 crate::middleware::from_fn(crate::middleware::logging::logging_middleware),
381 chain,
382 ));
383
384 let ip = req.ip_address.clone();
385 let res = chain.next(req).await;
386
387 let final_session_data = session.data.lock().unwrap().clone();
388 if let Ok(session_json) = serde_json::to_string(&final_session_data) {
389 session_store.store(&id, &session_json, &ip).await;
390 }
391
392 let (mut res_parts, res_body) = res.into_parts();
393 let cookie_val = format!("rustbasic_session={}; Path=/; HttpOnly; SameSite=Lax", id);
394 res_parts.headers.insert(
395 http::header::SET_COOKIE,
396 http::HeaderValue::from_str(&cookie_val).unwrap(),
397 );
398
399 hyper::Response::from_parts(res_parts, http_body_util::Full::new(hyper::body::Bytes::from(res_body)))
400}
401
402fn kill_port_if_in_use(_port: u16) {
403 #[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
404 use std::process::Command;
405 #[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
406 let port = _port;
407 #[cfg(target_os = "macos")]
408 {
409 let output = Command::new("lsof")
410 .arg("-t")
411 .arg(format!("-i:{}", port))
412 .output();
413
414 if let Ok(out) = output {
415 let pid_str = String::from_utf8_lossy(&out.stdout).trim().to_string();
416 if !pid_str.is_empty() {
417 tracing::warn!("Port {} sedang digunakan oleh PID {}. Membunuh proses...", port, pid_str);
418
419 for pid in pid_str.split('\n') {
420 if !pid.is_empty() {
421 let _ = Command::new("kill")
422 .arg("-9")
423 .arg(pid)
424 .output();
425 }
426 }
427
428 std::thread::sleep(std::time::Duration::from_millis(500));
429 }
430 }
431 }
432
433 #[cfg(target_os = "linux")]
434 {
435 let _ = Command::new("fuser")
436 .arg("-k")
437 .arg(format!("{}/tcp", port))
438 .output();
439 }
440
441 #[cfg(target_os = "windows")]
442 {
443 let output = Command::new("cmd")
444 .args(&["/C", &format!("netstat -ano | findstr :{}", port)])
445 .output();
446
447 if let Ok(out) = output {
448 let stdout = String::from_utf8_lossy(&out.stdout);
449 let mut found = false;
450 for line in stdout.lines() {
451 let parts: Vec<&str> = line.split_whitespace().collect();
452 if let Some(pid) = parts.last() {
453 if pid.parse::<u32>().is_ok() {
454 tracing::warn!("Port {} sedang digunakan oleh PID {}. Membunuh proses...", port, pid);
455 let _ = Command::new("taskkill")
456 .args(&["/F", "/PID", pid])
457 .output();
458 found = true;
459 }
460 }
461 }
462 if found {
463 std::thread::sleep(std::time::Duration::from_millis(500));
464 }
465 }
466 }
467}
468
469#[cfg(feature = "websocket")]
470async fn handle_websocket_connection(ws_stream: hyper_tungstenite::HyperWebsocket) {
471 let mut ws = match ws_stream.await {
472 Ok(w) => w,
473 Err(e) => {
474 crate::tracing::error!("WebSocket handshake failed: {:?}", e);
475 return;
476 }
477 };
478
479 let state = crate::support::broadcaster::Broadcaster::state();
480 let conn_id = state.next_conn_id();
481 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<String>();
482 let mut subscribed_channels = Vec::new();
483
484 loop {
485 tokio::select! {
486 incoming = ws.next() => {
487 let msg = match incoming {
488 Some(Ok(m)) => m,
489 Some(Err(_)) | None => break,
490 };
491
492 if msg.is_text() {
493 let text = msg.to_text().unwrap_or("");
494 if let Ok(val) = serde_json::from_str::<serde_json::Value>(text)
495 && let Some(action) = val.get("action").and_then(|a| a.as_str())
496 && let Some(channel) = val.get("channel").and_then(|c| c.as_str()) {
497 match action {
498 "subscribe" => {
499 let session = crate::support::broadcaster::ClientSession {
500 id: conn_id,
501 tx: tx.clone(),
502 };
503 state.subscribe(channel, session).await;
504 subscribed_channels.push(channel.to_string());
505 }
506 "unsubscribe" => {
507 state.unsubscribe(channel, conn_id).await;
508 subscribed_channels.retain(|c| c != channel);
509 }
510 "broadcast" => {
511 if let Some(event) = val.get("event").and_then(|e| e.as_str())
512 && let Some(data) = val.get("data") {
513 let msg = serde_json::json!({
514 "event": event,
515 "channel": channel,
516 "data": data
517 });
518 if let Ok(msg_str) = serde_json::to_string(&msg) {
519 let channels = state.channels.read().await;
520 if let Some(sessions) = channels.get(channel) {
521 for session in sessions {
522 if session.id != conn_id {
523 let _ = session.tx.send(msg_str.clone());
524 }
525 }
526 }
527 }
528 }
529 }
530 _ => {}
531 }
532 }
533 } else if msg.is_close() {
534 break;
535 }
536 }
537 outgoing = rx.recv() => {
538 let text = match outgoing {
539 Some(t) => t,
540 None => break,
541 };
542 if ws.send(Message::Text(text)).await.is_err() {
543 break;
544 }
545 }
546 }
547 }
548
549 for channel in subscribed_channels {
550 state.unsubscribe(&channel, conn_id).await;
551 }
552}
553
554pub async fn bootstrap<
555 M: crate::schema::MigratorTrait + Send + Sync + 'static,
556 S: crate::seeder::SeederTrait + Send + Sync + 'static,
557>(
558 args: &[String],
559 app_router: Router<AppState>,
560 embedded_templates: fn(&str) -> Option<crate::rust_embed::EmbeddedFile>,
561 embedded_public: fn(&str) -> Option<crate::rust_embed::EmbeddedFile>,
562 seeder: Option<S>,
563) {
564 let cfg = Config::load();
566
567 if args.len() > 1 && crate::cli::handle::<M, S>(
569 args,
570 &cfg,
571 seeder,
572 ).await {
573 return;
574 }
575
576 let db = crate::database::connect(&cfg).await;
578
579 crate::session::init_sessions(&cfg).await;
581 let session_store = crate::session::setup_session(&cfg).await;
582
583 crate::view::set_embedded_templates(embedded_templates);
585 set_embedded_public(embedded_public);
586
587 start_server(cfg, session_store, db, app_router).await;
589}