Skip to main content

rustbasic_core/
server.rs

1use crate::app::Config;
2use crate::tracing;
3use crate::session_manager::RustBasicSessionStore;
4use crate::router::{Router, Response};
5use crate::requests::Request;
6use std::net::SocketAddr;
7use crate::sql::AnyPool;
8use std::sync::Arc;
9use std::convert::Infallible;
10use tokio::net::TcpListener;
11use hyper::service::service_fn;
12use hyper_util::rt::TokioIo;
13use hyper::server::conn::http1;
14use crate::rand::distr::SampleString;
15#[cfg(feature = "websocket")]
16use futures_util::{SinkExt, StreamExt};
17#[cfg(feature = "websocket")]
18use tokio_tungstenite::tungstenite::Message;
19
20#[derive(Clone)]
21pub struct AppState {
22    pub db: AnyPool,
23    pub config: Arc<Config>,
24}
25
26static EMBEDDED_PUBLIC_GET: std::sync::OnceLock<fn(&str) -> Option<crate::rust_embed::EmbeddedFile>> = std::sync::OnceLock::new();
27
28pub fn set_embedded_public(f: fn(&str) -> Option<crate::rust_embed::EmbeddedFile>) {
29    EMBEDDED_PUBLIC_GET.set(f).ok();
30}
31
32pub fn get_embedded_public_fn() -> Option<fn(&str) -> Option<crate::rust_embed::EmbeddedFile>> {
33    EMBEDDED_PUBLIC_GET.get().copied()
34}
35
36fn guess_mime(path: &str) -> &'static str {
37    if path.ends_with(".js") {
38        "application/javascript"
39    } else if path.ends_with(".css") {
40        "text/css"
41    } else if path.ends_with(".html") {
42        "text/html"
43    } else if path.ends_with(".png") {
44        "image/png"
45    } else if path.ends_with(".jpg") || path.ends_with(".jpeg") {
46        "image/jpeg"
47    } else if path.ends_with(".svg") {
48        "image/svg+xml"
49    } else if path.ends_with(".ico") {
50        "image/x-icon"
51    } else if path.ends_with(".json") {
52        "application/json"
53    } else if path.ends_with(".woff") {
54        "font/woff"
55    } else if path.ends_with(".woff2") {
56        "font/woff2"
57    } else {
58        "application/octet-stream"
59    }
60}
61
62pub async fn start_server(
63    cfg: Config, 
64    session_store: RustBasicSessionStore,
65    db: AnyPool,
66    app_router: Router<AppState>,
67) {
68    // Populate named routes
69    let mut routes_map = std::collections::HashMap::new();
70    for r in &app_router.routes {
71        if let Some(ref name) = r.name {
72            routes_map.insert(name.clone(), r.path.clone());
73        }
74    }
75    let _ = crate::router::NAMED_ROUTES.set(routes_map);
76
77    // 0. Kill port jika sedang digunakan (Force Restart)
78    kill_port_if_in_use(cfg.app_port);
79
80    // 0.5 Set Timezone Global
81    unsafe {
82        std::env::set_var("TZ", &cfg.app_timezone);
83    }
84
85    // 1. Inisialisasi State
86    let state = AppState {
87        db,
88        config: Arc::new(cfg.clone()),
89    };
90
91    // 3. Tentukan Alamat
92    let addr_str = format!("{}:{}", cfg.app_host, cfg.app_port);
93    let addr: SocketAddr = addr_str.parse().expect("Alamat server tidak valid");
94    
95    tracing::info!("{} berjalan di: http://{}", cfg.app_name, addr);
96    tracing::info!("WebSockets enabled: {}", cfg.websocket_enabled);
97    
98    // 4. Jalankan Server
99    let listener = TcpListener::bind(addr).await.unwrap();
100    
101    loop {
102        let (stream, peer_addr) = match listener.accept().await {
103            Ok(ok) => ok,
104            Err(_) => continue,
105        };
106        
107        // Optimasi latensi: Kirim paket TCP langsung tanpa buffering (disable Nagle's algorithm)
108        let _ = stream.set_nodelay(true);
109        
110        let io = TokioIo::new(stream);
111        let state = state.clone();
112        let router = app_router.clone();
113        let peer_ip = peer_addr.ip().to_string();
114        let session_store = session_store.clone();
115
116        tokio::task::spawn(async move {
117            let service = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
118                let state = state.clone();
119                let router = router.clone();
120                let peer_ip = peer_ip.clone();
121                let session_store = session_store.clone();
122                async move {
123                    let res = handle_http_request(req, peer_ip, state, router, session_store).await;
124                    Ok::<_, Infallible>(res)
125                }
126            });
127
128            if let Err(err) = http1::Builder::new()
129                .serve_connection(io, service)
130                .with_upgrades()
131                .await
132            {
133                tracing::debug!("Error serving connection: {:?}", err);
134            }
135        });
136    }
137}
138
139pub(crate) fn match_path(route_path: &str, req_path: &str) -> bool {
140    let r_parts: Vec<&str> = route_path.split('/').filter(|s| !s.is_empty()).collect();
141    let q_parts: Vec<&str> = req_path.split('/').filter(|s| !s.is_empty()).collect();
142    
143    if r_parts.len() != q_parts.len() {
144        return false;
145    }
146    
147    for (r, q) in r_parts.iter().zip(q_parts.iter()) {
148        if r.starts_with(':') || (r.starts_with('{') && r.ends_with('}')) {
149            continue;
150        }
151        if r != q {
152            return false;
153        }
154    }
155    true
156}
157
158/// Ekstrak nilai route parameter dari URL request.
159/// Contoh: route="/user/{id}", path="/user/42" → {"id": "42"}
160pub(crate) fn extract_params(route_path: &str, req_path: &str) -> std::collections::HashMap<String, String> {
161    let mut params = std::collections::HashMap::new();
162    let r_parts: Vec<&str> = route_path.split('/').filter(|s| !s.is_empty()).collect();
163    let q_parts: Vec<&str> = req_path.split('/').filter(|s| !s.is_empty()).collect();
164
165    for (r, q) in r_parts.iter().zip(q_parts.iter()) {
166        if r.starts_with('{') && r.ends_with('}') {
167            // Sintaks {param}
168            let key = &r[1..r.len() - 1];
169            params.insert(key.to_string(), q.to_string());
170        } else if let Some(key) = r.strip_prefix(':') {
171            // Sintaks :param
172            params.insert(key.to_string(), q.to_string());
173        }
174    }
175    params
176}
177
178async fn serve_static_or_404(path: &str, state: &AppState) -> Response {
179    let clean_path = path.trim_start_matches('/');
180    let file_path = if clean_path.is_empty() { "index.html" } else { clean_path };
181
182    if state.config.app_debug {
183        let disk_path = std::path::Path::new("public").join(file_path);
184        if disk_path.exists() && disk_path.is_file()
185            && let Ok(content) = std::fs::read(&disk_path) {
186                let mime = guess_mime(file_path);
187                return http::Response::builder()
188                    .header(http::header::CONTENT_TYPE, mime)
189                    .body(content)
190                    .unwrap();
191            }
192    } else {
193        if let Some(file) = EMBEDDED_PUBLIC_GET.get().and_then(|f| f(file_path)) {
194            let mime = guess_mime(file_path);
195            return http::Response::builder()
196                .header(http::header::CONTENT_TYPE, mime)
197                .body(file.data.to_vec())
198                .unwrap();
199        }
200    }
201
202    crate::errors::ErrorController::not_found().await
203}
204
205async fn handle_http_request(
206    #[allow(unused_mut)] mut hyper_req: hyper::Request<hyper::body::Incoming>,
207    peer_ip: String,
208    state: AppState,
209    router: Router<AppState>,
210    session_store: RustBasicSessionStore,
211) -> hyper::Response<http_body_util::Full<hyper::body::Bytes>> {
212    use http_body_util::BodyExt;
213    
214    // Check for WebSocket upgrade at "/ws" route
215    let path_str = hyper_req.uri().path().to_string();
216    if path_str == "/ws" {
217        #[cfg(feature = "websocket")]
218        {
219            if !state.config.websocket_enabled {
220                return hyper::Response::builder()
221                    .status(http::StatusCode::NOT_FOUND)
222                    .body(http_body_util::Full::new(hyper::body::Bytes::from("WebSockets are disabled")))
223                    .unwrap();
224            }
225
226            if hyper_tungstenite::is_upgrade_request(&hyper_req) {
227                match hyper_tungstenite::upgrade(&mut hyper_req, None) {
228                    Ok((response, websocket)) => {
229                        tokio::spawn(async move {
230                            handle_websocket_connection(websocket).await;
231                        });
232                        let (parts, _) = response.into_parts();
233                        return hyper::Response::from_parts(parts, http_body_util::Full::new(hyper::body::Bytes::new()));
234                    }
235                    Err(e) => {
236                        tracing::error!("Gagal mengupgrade koneksi WebSocket: {:?}", e);
237                    }
238                }
239            }
240        }
241        #[cfg(not(feature = "websocket"))]
242        {
243            return hyper::Response::builder()
244                .status(http::StatusCode::NOT_FOUND)
245                .body(http_body_util::Full::new(hyper::body::Bytes::from("WebSockets feature not compiled")))
246                .unwrap();
247        }
248    }
249    
250    let (parts, body) = hyper_req.into_parts();
251    let method = parts.method.clone();
252    let uri = parts.uri.clone();
253    let path = uri.path().to_string();
254    
255    let mut headers = std::collections::HashMap::new();
256    for (name, val) in parts.headers.iter() {
257        if let Ok(val_str) = val.to_str() {
258            headers.insert(name.as_str().to_lowercase(), val_str.to_string());
259        }
260    }
261    
262    let mut inputs = serde_json::json!({});
263    if let Some(query) = uri.query()
264        && let Ok(params) = crate::serde_urlencoded::from_str::<std::collections::HashMap<String, String>>(query) {
265            for (k, v) in params {
266                inputs[k] = serde_json::json!(v);
267            }
268        }
269    
270    let body_bytes = body.collect().await.map(|c| c.to_bytes()).unwrap_or_default();
271    let content_type = headers.get("content-type").map(|s| s.as_str()).unwrap_or("");
272    if content_type.starts_with("application/json") {
273        if let Ok(json_val) = serde_json::from_slice::<serde_json::Value>(&body_bytes)
274            && let serde_json::Value::Object(obj) = json_val {
275                for (k, v) in obj {
276                    inputs[k] = v;
277                }
278            }
279    } else if content_type.starts_with("application/x-www-form-urlencoded")
280        && let Ok(params) = crate::serde_urlencoded::from_bytes::<std::collections::HashMap<String, String>>(&body_bytes) {
281            for (k, v) in params {
282                inputs[k] = serde_json::json!(v);
283            }
284        }
285    
286    let mut session_id = None;
287    if let Some(cookie_header) = headers.get("cookie") {
288        for cookie in cookie_header.split(';') {
289            let parts: Vec<&str> = cookie.split('=').map(|s| s.trim()).collect();
290            if parts.len() == 2 && parts[0] == "rustbasic_session" {
291                session_id = Some(parts[1].to_string());
292                break;
293            }
294        }
295    }
296    
297    let id = session_id.unwrap_or_else(|| {
298        crate::rand::distr::Alphanumeric.sample_string(&mut crate::rand::rng(), 40)
299    });
300    
301    let session_data = if let Some(payload_str) = session_store.load(&id).await {
302        serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(&payload_str).unwrap_or_default()
303    } else {
304        serde_json::Map::new()
305    };
306    
307    let session = crate::session::Session::new(id.clone());
308    *session.data.lock().unwrap() = session_data;
309    
310    if session.get::<String>("_token").is_none() {
311        let new_token = crate::rand::distr::Alphanumeric.sample_string(&mut crate::rand::rng(), 40);
312        session.set("_token", new_token);
313    }
314    
315    let req = Request {
316        inputs,
317        method: method.clone(),
318        path: path.clone(),
319        headers,
320        session: session.clone(),
321        state: state.clone(),
322        ip_address: peer_ip,
323        params: std::collections::HashMap::new(), // diisi oleh RouteDispatcher saat match
324    };
325    
326    struct RouteDispatcher {
327        router: Router<AppState>,
328        state: AppState,
329    }
330
331    #[crate::async_trait]
332    impl crate::router::ErasedHandler for RouteDispatcher {
333        async fn call(&self, req: Request) -> Response {
334            let method = req.method.clone();
335            let path = req.path.clone();
336            
337            let mut matched_handler = None;
338            let mut matched_params = std::collections::HashMap::new();
339            for route in &self.router.routes {
340                if match_path(&route.path, &path) {
341                    for (m, h) in &route.handlers {
342                        if m == method {
343                            matched_handler = Some(h.clone());
344                            matched_params = extract_params(&route.path, &path);
345                            break;
346                        }
347                    }
348                }
349                if matched_handler.is_some() {
350                    break;
351                }
352            }
353            
354            if let Some(handler) = matched_handler {
355                // Inject route params ke request
356                let mut req = req;
357                req.params = matched_params;
358                let mut chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::End(handler));
359                for mw in self.router.middlewares.iter().rev() {
360                    chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(mw.clone(), chain));
361                }
362                chain.next(req).await
363            } else {
364                serve_static_or_404(&path, &self.state).await
365            }
366        }
367    }
368
369    let dispatcher = std::sync::Arc::new(RouteDispatcher {
370        router,
371        state: state.clone(),
372    });
373
374    let mut chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::End(dispatcher));
375    chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(
376        crate::middleware::from_fn(crate::middleware::security_headers::security_headers_middleware),
377        chain,
378    ));
379    chain = std::sync::Arc::new(crate::middleware::MiddlewareChain::Next(
380        crate::middleware::from_fn(crate::middleware::logging::logging_middleware),
381        chain,
382    ));
383
384    let ip = req.ip_address.clone();
385    let res = chain.next(req).await;
386    
387    let final_session_data = session.data.lock().unwrap().clone();
388    if let Ok(session_json) = serde_json::to_string(&final_session_data) {
389        session_store.store(&id, &session_json, &ip).await;
390    }
391    
392    let (mut res_parts, res_body) = res.into_parts();
393    let cookie_val = format!("rustbasic_session={}; Path=/; HttpOnly; SameSite=Lax", id);
394    res_parts.headers.insert(
395        http::header::SET_COOKIE,
396        http::HeaderValue::from_str(&cookie_val).unwrap(),
397    );
398    
399    hyper::Response::from_parts(res_parts, http_body_util::Full::new(hyper::body::Bytes::from(res_body)))
400}
401
402fn kill_port_if_in_use(_port: u16) {
403    #[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
404    use std::process::Command;
405    #[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
406    let port = _port;
407    #[cfg(target_os = "macos")]
408    {
409        let output = Command::new("lsof")
410            .arg("-t")
411            .arg(format!("-i:{}", port))
412            .output();
413
414        if let Ok(out) = output {
415            let pid_str = String::from_utf8_lossy(&out.stdout).trim().to_string();
416            if !pid_str.is_empty() {
417                tracing::warn!("Port {} sedang digunakan oleh PID {}. Membunuh proses...", port, pid_str);
418                
419                for pid in pid_str.split('\n') {
420                    if !pid.is_empty() {
421                        let _ = Command::new("kill")
422                            .arg("-9")
423                            .arg(pid)
424                            .output();
425                    }
426                }
427
428                std::thread::sleep(std::time::Duration::from_millis(500));
429            }
430        }
431    }
432
433    #[cfg(target_os = "linux")]
434    {
435        let _ = Command::new("fuser")
436            .arg("-k")
437            .arg(format!("{}/tcp", port))
438            .output();
439    }
440
441    #[cfg(target_os = "windows")]
442    {
443        let output = Command::new("cmd")
444            .args(&["/C", &format!("netstat -ano | findstr :{}", port)])
445            .output();
446
447        if let Ok(out) = output {
448            let stdout = String::from_utf8_lossy(&out.stdout);
449            let mut found = false;
450            for line in stdout.lines() {
451                let parts: Vec<&str> = line.split_whitespace().collect();
452                if let Some(pid) = parts.last() {
453                    if pid.parse::<u32>().is_ok() {
454                        tracing::warn!("Port {} sedang digunakan oleh PID {}. Membunuh proses...", port, pid);
455                        let _ = Command::new("taskkill")
456                            .args(&["/F", "/PID", pid])
457                            .output();
458                        found = true;
459                    }
460                }
461            }
462            if found {
463                std::thread::sleep(std::time::Duration::from_millis(500));
464            }
465        }
466    }
467}
468
469#[cfg(feature = "websocket")]
470async fn handle_websocket_connection(ws_stream: hyper_tungstenite::HyperWebsocket) {
471    let mut ws = match ws_stream.await {
472        Ok(w) => w,
473        Err(e) => {
474            crate::tracing::error!("WebSocket handshake failed: {:?}", e);
475            return;
476        }
477    };
478
479    let state = crate::support::broadcaster::Broadcaster::state();
480    let conn_id = state.next_conn_id();
481    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<String>();
482    let mut subscribed_channels = Vec::new();
483
484    loop {
485        tokio::select! {
486            incoming = ws.next() => {
487                let msg = match incoming {
488                    Some(Ok(m)) => m,
489                    Some(Err(_)) | None => break,
490                };
491
492                if msg.is_text() {
493                    let text = msg.to_text().unwrap_or("");
494                    if let Ok(val) = serde_json::from_str::<serde_json::Value>(text)
495                        && let Some(action) = val.get("action").and_then(|a| a.as_str())
496                            && let Some(channel) = val.get("channel").and_then(|c| c.as_str()) {
497                                match action {
498                                    "subscribe" => {
499                                        let session = crate::support::broadcaster::ClientSession {
500                                            id: conn_id,
501                                            tx: tx.clone(),
502                                        };
503                                        state.subscribe(channel, session).await;
504                                        subscribed_channels.push(channel.to_string());
505                                    }
506                                    "unsubscribe" => {
507                                        state.unsubscribe(channel, conn_id).await;
508                                        subscribed_channels.retain(|c| c != channel);
509                                    }
510                                    "broadcast" => {
511                                        if let Some(event) = val.get("event").and_then(|e| e.as_str())
512                                            && let Some(data) = val.get("data") {
513                                                let msg = serde_json::json!({
514                                                    "event": event,
515                                                    "channel": channel,
516                                                    "data": data
517                                                });
518                                                if let Ok(msg_str) = serde_json::to_string(&msg) {
519                                                    let channels = state.channels.read().await;
520                                                    if let Some(sessions) = channels.get(channel) {
521                                                        for session in sessions {
522                                                            if session.id != conn_id {
523                                                                let _ = session.tx.send(msg_str.clone());
524                                                            }
525                                                        }
526                                                    }
527                                                }
528                                            }
529                                    }
530                                    _ => {}
531                                }
532                            }
533                } else if msg.is_close() {
534                    break;
535                }
536            }
537            outgoing = rx.recv() => {
538                let text = match outgoing {
539                    Some(t) => t,
540                    None => break,
541                };
542                if ws.send(Message::Text(text)).await.is_err() {
543                    break;
544                }
545            }
546        }
547    }
548
549    for channel in subscribed_channels {
550        state.unsubscribe(&channel, conn_id).await;
551    }
552}
553
554pub async fn bootstrap<
555    M: crate::schema::MigratorTrait + Send + Sync + 'static,
556    S: crate::seeder::SeederTrait + Send + Sync + 'static,
557>(
558    args: &[String],
559    app_router: Router<AppState>,
560    embedded_templates: fn(&str) -> Option<crate::rust_embed::EmbeddedFile>,
561    embedded_public: fn(&str) -> Option<crate::rust_embed::EmbeddedFile>,
562    seeder: Option<S>,
563) {
564    // 1. Muat Konfigurasi
565    let cfg = Config::load();
566
567    // 2. Cek Command CLI
568    if args.len() > 1 && crate::cli::handle::<M, S>(
569        args,
570        &cfg,
571        seeder,
572    ).await {
573        return;
574    }
575
576    // 3. Setup Database
577    let db = crate::database::connect(&cfg).await;
578
579    // 4. Inisialisasi Session Store
580    crate::session::init_sessions(&cfg).await;
581    let session_store = crate::session::setup_session(&cfg).await;
582
583    // 5. Inject embedded files
584    crate::view::set_embedded_templates(embedded_templates);
585    set_embedded_public(embedded_public);
586
587    // 6. Jalankan Server
588    start_server(cfg, session_store, db, app_router).await;
589}