Skip to main content

rustbasic_core/
server.rs

1use crate::app::Config;
2use crate::tracing;
3use crate::session_manager::RustBasicSessionStore;
4use crate::router::{Router, Response};
5use crate::requests::Request;
6use std::net::SocketAddr;
7use crate::sql::AnyPool;
8use std::sync::Arc;
9use std::convert::Infallible;
10use tokio::net::TcpListener;
11use hyper::service::service_fn;
12use hyper_util::rt::TokioIo;
13use hyper::server::conn::http1;
14use crate::rand::distr::SampleString;
15#[cfg(feature = "websocket")]
16use futures_util::{SinkExt, StreamExt};
17#[cfg(feature = "websocket")]
18use tokio_tungstenite::tungstenite::Message;
19
20#[derive(Clone)]
21pub struct AppState {
22    pub db: AnyPool,
23    pub config: Arc<Config>,
24}
25
26static EMBEDDED_PUBLIC_GET: std::sync::OnceLock<fn(&str) -> Option<crate::rust_embed::EmbeddedFile>> = std::sync::OnceLock::new();
27
28pub fn set_embedded_public(f: fn(&str) -> Option<crate::rust_embed::EmbeddedFile>) {
29    EMBEDDED_PUBLIC_GET.set(f).ok();
30}
31
32fn guess_mime(path: &str) -> &'static str {
33    if path.ends_with(".js") {
34        "application/javascript"
35    } else if path.ends_with(".css") {
36        "text/css"
37    } else if path.ends_with(".html") {
38        "text/html"
39    } else if path.ends_with(".png") {
40        "image/png"
41    } else if path.ends_with(".jpg") || path.ends_with(".jpeg") {
42        "image/jpeg"
43    } else if path.ends_with(".svg") {
44        "image/svg+xml"
45    } else if path.ends_with(".ico") {
46        "image/x-icon"
47    } else if path.ends_with(".json") {
48        "application/json"
49    } else if path.ends_with(".woff") {
50        "font/woff"
51    } else if path.ends_with(".woff2") {
52        "font/woff2"
53    } else {
54        "application/octet-stream"
55    }
56}
57
58pub async fn start_server(
59    cfg: Config, 
60    session_store: RustBasicSessionStore,
61    db: AnyPool,
62    app_router: Router<AppState>,
63) {
64    // Populate named routes
65    let mut routes_map = std::collections::HashMap::new();
66    for r in &app_router.routes {
67        if let Some(ref name) = r.name {
68            routes_map.insert(name.clone(), r.path.clone());
69        }
70    }
71    let _ = crate::router::NAMED_ROUTES.set(routes_map);
72
73    // 0. Kill port jika sedang digunakan (Force Restart)
74    kill_port_if_in_use(cfg.app_port);
75
76    // 0.5 Set Timezone Global
77    unsafe {
78        std::env::set_var("TZ", &cfg.app_timezone);
79    }
80
81    // 1. Inisialisasi State
82    let state = AppState {
83        db,
84        config: Arc::new(cfg.clone()),
85    };
86
87    // 3. Tentukan Alamat
88    let addr_str = format!("{}:{}", cfg.app_host, cfg.app_port);
89    let addr: SocketAddr = addr_str.parse().expect("Alamat server tidak valid");
90    
91    tracing::info!("{} berjalan di: http://{}", cfg.app_name, addr);
92    tracing::info!("WebSockets enabled: {}", cfg.websocket_enabled);
93    
94    // 4. Jalankan Server
95    let listener = TcpListener::bind(addr).await.unwrap();
96    
97    loop {
98        let (stream, peer_addr) = match listener.accept().await {
99            Ok(ok) => ok,
100            Err(_) => continue,
101        };
102        
103        // Optimasi latensi: Kirim paket TCP langsung tanpa buffering (disable Nagle's algorithm)
104        let _ = stream.set_nodelay(true);
105        
106        let io = TokioIo::new(stream);
107        let state = state.clone();
108        let router = app_router.clone();
109        let peer_ip = peer_addr.ip().to_string();
110        let session_store = session_store.clone();
111
112        tokio::task::spawn(async move {
113            let service = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
114                let state = state.clone();
115                let router = router.clone();
116                let peer_ip = peer_ip.clone();
117                let session_store = session_store.clone();
118                async move {
119                    let res = handle_http_request(req, peer_ip, state, router, session_store).await;
120                    Ok::<_, Infallible>(res)
121                }
122            });
123
124            if let Err(err) = http1::Builder::new()
125                .serve_connection(io, service)
126                .with_upgrades()
127                .await
128            {
129                tracing::debug!("Error serving connection: {:?}", err);
130            }
131        });
132    }
133}
134
135pub(crate) fn match_path(route_path: &str, req_path: &str) -> bool {
136    let r_parts: Vec<&str> = route_path.split('/').filter(|s| !s.is_empty()).collect();
137    let q_parts: Vec<&str> = req_path.split('/').filter(|s| !s.is_empty()).collect();
138    
139    if r_parts.len() != q_parts.len() {
140        return false;
141    }
142    
143    for (r, q) in r_parts.iter().zip(q_parts.iter()) {
144        if r.starts_with(':') || (r.starts_with('{') && r.ends_with('}')) {
145            continue;
146        }
147        if r != q {
148            return false;
149        }
150    }
151    true
152}
153
154/// Ekstrak nilai route parameter dari URL request.
155/// Contoh: route="/user/{id}", path="/user/42" → {"id": "42"}
156pub(crate) fn extract_params(route_path: &str, req_path: &str) -> std::collections::HashMap<String, String> {
157    let mut params = std::collections::HashMap::new();
158    let r_parts: Vec<&str> = route_path.split('/').filter(|s| !s.is_empty()).collect();
159    let q_parts: Vec<&str> = req_path.split('/').filter(|s| !s.is_empty()).collect();
160
161    for (r, q) in r_parts.iter().zip(q_parts.iter()) {
162        if r.starts_with('{') && r.ends_with('}') {
163            // Sintaks {param}
164            let key = &r[1..r.len() - 1];
165            params.insert(key.to_string(), q.to_string());
166        } else if r.starts_with(':') {
167            // Sintaks :param
168            let key = &r[1..];
169            params.insert(key.to_string(), q.to_string());
170        }
171    }
172    params
173}
174
175async fn serve_static_or_404(path: &str, state: &AppState) -> Response {
176    let clean_path = path.trim_start_matches('/');
177    let file_path = if clean_path.is_empty() { "index.html" } else { clean_path };
178
179    if state.config.app_debug {
180        let disk_path = std::path::Path::new("public").join(file_path);
181        if disk_path.exists() && disk_path.is_file() {
182            if let Ok(content) = std::fs::read(&disk_path) {
183                let mime = guess_mime(file_path);
184                return http::Response::builder()
185                    .header(http::header::CONTENT_TYPE, mime)
186                    .body(content)
187                    .unwrap();
188            }
189        }
190    } else {
191        if let Some(file) = EMBEDDED_PUBLIC_GET.get().and_then(|f| f(file_path)) {
192            let mime = guess_mime(file_path);
193            return http::Response::builder()
194                .header(http::header::CONTENT_TYPE, mime)
195                .body(file.data.to_vec())
196                .unwrap();
197        }
198    }
199
200    crate::errors::ErrorController::not_found().await
201}
202
203async fn handle_http_request(
204    #[allow(unused_mut)] mut hyper_req: hyper::Request<hyper::body::Incoming>,
205    peer_ip: String,
206    state: AppState,
207    router: Router<AppState>,
208    session_store: RustBasicSessionStore,
209) -> hyper::Response<http_body_util::Full<hyper::body::Bytes>> {
210    use http_body_util::BodyExt;
211    
212    // Check for WebSocket upgrade at "/ws" route
213    let path_str = hyper_req.uri().path().to_string();
214    if path_str == "/ws" {
215        #[cfg(feature = "websocket")]
216        {
217            if !state.config.websocket_enabled {
218                return hyper::Response::builder()
219                    .status(http::StatusCode::NOT_FOUND)
220                    .body(http_body_util::Full::new(hyper::body::Bytes::from("WebSockets are disabled")))
221                    .unwrap();
222            }
223
224            if hyper_tungstenite::is_upgrade_request(&hyper_req) {
225                match hyper_tungstenite::upgrade(&mut hyper_req, None) {
226                    Ok((response, websocket)) => {
227                        tokio::spawn(async move {
228                            handle_websocket_connection(websocket).await;
229                        });
230                        let (parts, _) = response.into_parts();
231                        return hyper::Response::from_parts(parts, http_body_util::Full::new(hyper::body::Bytes::new()));
232                    }
233                    Err(e) => {
234                        tracing::error!("Gagal mengupgrade koneksi WebSocket: {:?}", e);
235                    }
236                }
237            }
238        }
239        #[cfg(not(feature = "websocket"))]
240        {
241            return hyper::Response::builder()
242                .status(http::StatusCode::NOT_FOUND)
243                .body(http_body_util::Full::new(hyper::body::Bytes::from("WebSockets feature not compiled")))
244                .unwrap();
245        }
246    }
247    
248    let (parts, body) = hyper_req.into_parts();
249    let method = parts.method.clone();
250    let uri = parts.uri.clone();
251    let path = uri.path().to_string();
252    
253    let mut headers = std::collections::HashMap::new();
254    for (name, val) in parts.headers.iter() {
255        if let Ok(val_str) = val.to_str() {
256            headers.insert(name.as_str().to_lowercase(), val_str.to_string());
257        }
258    }
259    
260    let mut inputs = serde_json::json!({});
261    if let Some(query) = uri.query() {
262        if let Ok(params) = crate::serde_urlencoded::from_str::<std::collections::HashMap<String, String>>(query) {
263            for (k, v) in params {
264                inputs[k] = serde_json::json!(v);
265            }
266        }
267    }
268    
269    let body_bytes = body.collect().await.map(|c| c.to_bytes()).unwrap_or_default();
270    let content_type = headers.get("content-type").map(|s| s.as_str()).unwrap_or("");
271    if content_type.starts_with("application/json") {
272        if let Ok(json_val) = serde_json::from_slice::<serde_json::Value>(&body_bytes) {
273            if let serde_json::Value::Object(obj) = json_val {
274                for (k, v) in obj {
275                    inputs[k] = v;
276                }
277            }
278        }
279    } else if content_type.starts_with("application/x-www-form-urlencoded") {
280        if let Ok(params) = crate::serde_urlencoded::from_bytes::<std::collections::HashMap<String, String>>(&body_bytes) {
281            for (k, v) in params {
282                inputs[k] = serde_json::json!(v);
283            }
284        }
285    }
286    
287    let mut session_id = None;
288    if let Some(cookie_header) = headers.get("cookie") {
289        for cookie in cookie_header.split(';') {
290            let parts: Vec<&str> = cookie.split('=').map(|s| s.trim()).collect();
291            if parts.len() == 2 && parts[0] == "rustbasic_session" {
292                session_id = Some(parts[1].to_string());
293                break;
294            }
295        }
296    }
297    
298    let id = session_id.unwrap_or_else(|| {
299        crate::rand::distr::Alphanumeric.sample_string(&mut crate::rand::rng(), 40)
300    });
301    
302    let session_data = if let Some(payload_str) = session_store.load(&id).await {
303        serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(&payload_str).unwrap_or_default()
304    } else {
305        serde_json::Map::new()
306    };
307    
308    let session = crate::session::Session::new(id.clone());
309    *session.data.lock().unwrap() = session_data;
310    
311    if session.get::<String>("_token").is_none() {
312        let new_token = crate::rand::distr::Alphanumeric.sample_string(&mut crate::rand::rng(), 40);
313        session.set("_token", new_token);
314    }
315    
316    let req = Request {
317        inputs,
318        method: method.clone(),
319        path: path.clone(),
320        headers,
321        session: session.clone(),
322        state: state.clone(),
323        ip_address: peer_ip,
324        params: std::collections::HashMap::new(), // diisi oleh RouteDispatcher saat match
325    };
326    
327    struct RouteDispatcher {
328        router: Router<AppState>,
329        state: AppState,
330    }
331
332    #[crate::async_trait]
333    impl crate::router::ErasedHandler for RouteDispatcher {
334        async fn call(&self, req: Request) -> Response {
335            let method = req.method.clone();
336            let path = req.path.clone();
337            
338            let mut matched_handler = None;
339            let mut matched_params = std::collections::HashMap::new();
340            for route in &self.router.routes {
341                if match_path(&route.path, &path) {
342                    for (m, h) in &route.handlers {
343                        if m == &method {
344                            matched_handler = Some(h.clone());
345                            matched_params = extract_params(&route.path, &path);
346                            break;
347                        }
348                    }
349                }
350                if matched_handler.is_some() {
351                    break;
352                }
353            }
354            
355            if let Some(handler) = matched_handler {
356                // Inject route params ke request
357                let mut req = req;
358                req.params = matched_params;
359                let mut chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::End(handler));
360                for mw in self.router.middlewares.iter().rev() {
361                    chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(mw.clone(), chain));
362                }
363                chain.next(req).await
364            } else {
365                serve_static_or_404(&path, &self.state).await
366            }
367        }
368    }
369
370    let dispatcher = std::sync::Arc::new(RouteDispatcher {
371        router,
372        state: state.clone(),
373    });
374
375    let mut chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::End(dispatcher));
376    chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(
377        crate::middleware::from_fn(crate::middleware::security_headers::security_headers_middleware),
378        chain,
379    ));
380    chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(
381        crate::middleware::from_fn(crate::middleware::logging::logging_middleware),
382        chain,
383    ));
384
385    let ip = req.ip_address.clone();
386    let res = chain.next(req).await;
387    
388    let final_session_data = session.data.lock().unwrap().clone();
389    if let Ok(session_json) = serde_json::to_string(&final_session_data) {
390        session_store.store(&id, &session_json, &ip).await;
391    }
392    
393    let (mut res_parts, res_body) = res.into_parts();
394    let cookie_val = format!("rustbasic_session={}; Path=/; HttpOnly; SameSite=Lax", id);
395    res_parts.headers.insert(
396        http::header::SET_COOKIE,
397        http::HeaderValue::from_str(&cookie_val).unwrap(),
398    );
399    
400    hyper::Response::from_parts(res_parts, http_body_util::Full::new(hyper::body::Bytes::from(res_body)))
401}
402
403fn kill_port_if_in_use(_port: u16) {
404    #[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
405    use std::process::Command;
406    #[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
407    let port = _port;
408    #[cfg(target_os = "macos")]
409    {
410        let output = Command::new("lsof")
411            .arg("-t")
412            .arg(format!("-i:{}", port))
413            .output();
414
415        if let Ok(out) = output {
416            let pid_str = String::from_utf8_lossy(&out.stdout).trim().to_string();
417            if !pid_str.is_empty() {
418                tracing::warn!("Port {} sedang digunakan oleh PID {}. Membunuh proses...", port, pid_str);
419                
420                for pid in pid_str.split('\n') {
421                    if !pid.is_empty() {
422                        let _ = Command::new("kill")
423                            .arg("-9")
424                            .arg(pid)
425                            .output();
426                    }
427                }
428
429                std::thread::sleep(std::time::Duration::from_millis(500));
430            }
431        }
432    }
433
434    #[cfg(target_os = "linux")]
435    {
436        let _ = Command::new("fuser")
437            .arg("-k")
438            .arg(format!("{}/tcp", port))
439            .output();
440    }
441
442    #[cfg(target_os = "windows")]
443    {
444        let output = Command::new("cmd")
445            .args(&["/C", &format!("netstat -ano | findstr :{}", port)])
446            .output();
447
448        if let Ok(out) = output {
449            let stdout = String::from_utf8_lossy(&out.stdout);
450            let mut found = false;
451            for line in stdout.lines() {
452                let parts: Vec<&str> = line.split_whitespace().collect();
453                if let Some(pid) = parts.last() {
454                    if pid.parse::<u32>().is_ok() {
455                        tracing::warn!("Port {} sedang digunakan oleh PID {}. Membunuh proses...", port, pid);
456                        let _ = Command::new("taskkill")
457                            .args(&["/F", "/PID", pid])
458                            .output();
459                        found = true;
460                    }
461                }
462            }
463            if found {
464                std::thread::sleep(std::time::Duration::from_millis(500));
465            }
466        }
467    }
468}
469
470#[cfg(feature = "websocket")]
471async fn handle_websocket_connection(ws_stream: hyper_tungstenite::HyperWebsocket) {
472    let mut ws = match ws_stream.await {
473        Ok(w) => w,
474        Err(e) => {
475            crate::tracing::error!("WebSocket handshake failed: {:?}", e);
476            return;
477        }
478    };
479
480    let state = crate::support::broadcaster::Broadcaster::state();
481    let conn_id = state.next_conn_id();
482    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<String>();
483    let mut subscribed_channels = Vec::new();
484
485    loop {
486        tokio::select! {
487            incoming = ws.next() => {
488                let msg = match incoming {
489                    Some(Ok(m)) => m,
490                    Some(Err(_)) | None => break,
491                };
492
493                if msg.is_text() {
494                    let text = msg.to_text().unwrap_or("");
495                    if let Ok(val) = serde_json::from_str::<serde_json::Value>(text) {
496                        if let Some(action) = val.get("action").and_then(|a| a.as_str()) {
497                            if let Some(channel) = val.get("channel").and_then(|c| c.as_str()) {
498                                match action {
499                                    "subscribe" => {
500                                        let session = crate::support::broadcaster::ClientSession {
501                                            id: conn_id,
502                                            tx: tx.clone(),
503                                        };
504                                        state.subscribe(channel, session).await;
505                                        subscribed_channels.push(channel.to_string());
506                                    }
507                                    "unsubscribe" => {
508                                        state.unsubscribe(channel, conn_id).await;
509                                        subscribed_channels.retain(|c| c != channel);
510                                    }
511                                    "broadcast" => {
512                                        if let Some(event) = val.get("event").and_then(|e| e.as_str()) {
513                                            if let Some(data) = val.get("data") {
514                                                let msg = serde_json::json!({
515                                                    "event": event,
516                                                    "channel": channel,
517                                                    "data": data
518                                                });
519                                                if let Ok(msg_str) = serde_json::to_string(&msg) {
520                                                    let channels = state.channels.read().await;
521                                                    if let Some(sessions) = channels.get(channel) {
522                                                        for session in sessions {
523                                                            if session.id != conn_id {
524                                                                let _ = session.tx.send(msg_str.clone());
525                                                            }
526                                                        }
527                                                    }
528                                                }
529                                            }
530                                        }
531                                    }
532                                    _ => {}
533                                }
534                            }
535                        }
536                    }
537                } else if msg.is_close() {
538                    break;
539                }
540            }
541            outgoing = rx.recv() => {
542                let text = match outgoing {
543                    Some(t) => t,
544                    None => break,
545                };
546                if ws.send(Message::Text(text.into())).await.is_err() {
547                    break;
548                }
549            }
550        }
551    }
552
553    for channel in subscribed_channels {
554        state.unsubscribe(&channel, conn_id).await;
555    }
556}