1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
//! # WebIO Orchestration Engine
//!
//! This module serves as the central controller for the WebIO framework. It
//! manages the lifecycle of TCP connections, routing logic, and high-level
//! features like **Smart Asset Caching** and **Global WebSocket Broadcasting**.
//!
//! ### Key Orchestration Features:
//! * **Pre-emptive Multi-threading:** Spawns a dedicated OS thread per connection,
//! mimicking Go's concurrency model but with Rust's deterministic memory safety.
//! * **Smart RAM Cache:** Utilizes an `RwLock<HashMap>` to provide **~50µs** delivery
//! for "hot" assets (<500KB) while keeping large datasets on disk to prevent RAM exhaustion.
//! * **Global Client Registry:** Maintains a thread-safe `Arc<Mutex<Vec<TcpStream>>>`
//! allowing any thread to broadcast real-time data to all connected WebSocket clients.
//! * **Dynamic Nagle Control:** Allows developers to toggle `TCP_NODELAY` via `.set_nagle()`,
//! optimizing for either sub-millisecond API latency or high-throughput data syncs.
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::net::{TcpListener, TcpStream, Shutdown};
use std::io::{Read, Write, BufWriter};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::thread;
use std::path::{Path, PathBuf};
use std::future::Future;
use crate::{core::*, mime};
use crate::utils::{find_file_recursive, encode_ws_frame};
/// Supported HTTP Methods.
pub enum Method {
/// Requests a representation of the specified resource.
///
/// This is the most common method, used to **Retrieve Data** (like HTML,
/// images, or JSON) from the server without modifying any state.
GET,
/// Submits an entity to the specified resource.
///
/// Primarily used for **Form Submissions** or **Data Uploads**,
/// often resulting in a change in state or side effects on the server.
POST,
/// Replaces all current representations of the target resource.
///
/// Used for **Full Updates**, where the client sends a complete
/// version of the resource to overwrite the existing one.
PUT,
/// Applies partial modifications to a resource.
///
/// Ideal for **Incremental Updates**, allowing the client to
/// modify only specific fields instead of replacing the entire object.
PATCH,
/// Deletes the specified resource.
///
/// This method is used to **Permanently Remove** a file, database
/// record, or other resource identified by the URI.
DELETE,
/// Asks for a response identical to a GET request, but without the response body.
///
/// Extremely useful for **Meta-data Checks**, such as verifying file
/// size or MIME type via headers before committing to a full download.
HEAD,
/// Used to describe the communication options for the target resource.
///
/// Essential for **CORS (Cross-Origin Resource Sharing)**, where
/// browsers perform a "pre-flight" check to see which methods are permitted.
OPTIONS,
/// Establishes a tunnel to the server identified by the target resource.
///
/// Typically used for **SSL/TLS Proxying**, allowing secure
/// communication to pass through intermediate servers.
CONNECT,
/// Performs a message loop-back test along the path to the target resource.
///
/// Primarily a **Diagnostic Tool**, it echoes back the received
/// request so the client can see what changes proxies might be making.
TRACE,
/// Creates a duplicate of the source resource at the destination.
///
/// Primarily used in **File Management** to copy files or directories
/// without downloading and re-uploading the data.
COPY,
/// Moves a resource from one URI to another.
///
/// Equivalent to a **Rename** or **Cut-and-Paste** operation on a
/// remote file system.
MOVE,
/// Creates a new collection (folder/directory) at the specified location.
///
/// Used in **WebDAV** instead of `POST` or `PUT` to explicitly create
/// directory structures.
MKCOL,
/// Retrieves properties (metadata) for a resource.
///
/// This is the most used WebDAV method. it allows the client to ask:
/// "Is this a file or folder? What is its size? When was it modified?"
PROPFIND,
/// Used to "lock" a resource to prevent overwrite conflicts.
///
/// Essential for **Collaborative Editing**, ensuring only one user
/// can modify a file at a time.
LOCK,
/// Removes a lock previously established by the LOCK method.
///
/// Replaces the resource in a "read-only" or "shared" state so
/// other users can begin editing.
UNLOCK
}
pub use Method::*;
/// The central Application controller.
/// Manages routing, middleware, and the internal TCP lifecycle.
pub struct WebIo {
routes: Vec<(String, String, Handler)>,
pub mime_types: HashMap<String, String>,
mw: Option<Middleware>,
handlers_404: HashMap<String, Handler>,
static_dir: String,
pub log_reply_enabled: bool,
pub nagle_enabled: bool,
pub clients: Arc<Mutex<Vec<TcpStream>>>,
pub cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
pub banner_text: String,
}
impl WebIo {
/// Initializes a new WebIo instance with an empty routing table.
pub fn new() -> Self {
Self {
routes: Vec::new(),
mime_types: mime::default_mime_types(),
mw: None,
handlers_404: HashMap::new() ,
static_dir: "assets".to_string(), // Default name ==> "assets"
log_reply_enabled: false, // default value
nagle_enabled: true, // Default to ON (High Throughput)
clients: Arc::new(Mutex::new(Vec::new())),
cache: Arc::new(RwLock::new(HashMap::new())),
banner_text: "🦅 WebIO Live:".to_string(), // Default banner_text ==> "🦅 WebIO Live:"
}
}
pub fn broadcast(&self, message: &str) {
// Lock the global client list
let mut clients = self.clients.lock().expect("Registry Lock Poisoned");
// Wrap the text message in the binary WebSocket protocol
let frame = encode_ws_frame(message);
// Send to everyone and automatically clean up "ghost" connections
clients.retain_mut(|client| {
client.write_all(&frame).is_ok()
});
}
/// Toggles TCP_NODELAY (Nagle's Algorithm)
pub fn set_nagle(mut self, enabled: bool) -> Self {
self.nagle_enabled = enabled;
self
}
/// Logs the details of an HTTP reply, including method, path, status code, and processing time.
///
/// # Parameters
/// - `method`: The HTTP method of the handled request (e.g., "GET", "POST").
/// - `path`: The URL path that was processed (e.g., "/api/data").
/// - `status`: The HTTP status code of the outgoing reply (e.g., 200, 404).
/// - `start`: The `Instant` timestamp when the request started, used to calculate reply latency.
/// - `should_log`: A boolean flag indicating whether to perform logging. If `false`, the function returns immediately.
///
/// # Behavior
/// - If `should_log` is `false`, no log is produced.
/// - If `true`, logs a timestamped message showing the reply details and the elapsed time since `start`.
/// - The timestamp is formatted as HH:MM:SS based on the current system time.
/// - The log includes high-resolution timing (`start.elapsed()`) to measure the full lifecycle of the reply.
fn log_reply(&self, method: &str, path: &str, status: u16, start: Instant, should_log: bool) {
if !should_log {
return;
}
// --- High-Resolution Performance Logging ---
// We calculate the precise duration from the moment the TCP stream was accepted
// until the final byte of the chunked response is flushed.
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
println!(
"[{:02}:{:02}:{:02}] {} {} -> {} ({:?})",
(now/3600)%24, (now/60)%60, now%60,
method, path, status, start.elapsed() // Direct high-resolution measurement
);
}
/// Configures the root directory for serving static assets (CSS, JS, Images, etc.).
/// Example: app.use_static("src/assets");
pub fn use_static(&mut self, path: &str) {
self.static_dir = path.to_string();
}
/// Internal helper to serve static files from the configured directory.
async fn serve_static(&self, path: &str) -> Option<Reply> {
let relative_path = path.trim_start_matches('/');
let base_path = PathBuf::from(&self.static_dir);
let target_path = base_path.join(relative_path);
// 1. Direct Match (e.g., /css/style.css)
if target_path.exists() && target_path.is_file() {
return self.create_file_reply(&target_path);
}
// 2. Dynamic Discovery for favicon.ico
// If the browser asks for /favicon.ico but it's not at the root, search all folders.
if relative_path == "favicon.ico" {
if let Some(found_path) = find_file_recursive(&base_path, "favicon.ico") {
return self.create_file_reply(&found_path);
}
}
None
}
fn create_file_reply(&self, path: &Path) -> Option<Reply> {
let path_str = path.to_string_lossy().to_string();
// --- STEP 1: FAST PATH (Check RAM Cache) ---
{
let cache = self.cache.read().unwrap();
if let Some(content) = cache.get(&path_str) {
return Some(self.build_reply_with_mime(path, content.clone()));
}
}
// --- STEP 2: SLOW PATH (Read Disk & Save to RAM) ---
if let Ok(content) = std::fs::read(path) {
let mut cache = self.cache.write().unwrap();
// ONLY CACHE SMALL FILES (e.g., under 500KB)
// This keeps your 28ms CSS in RAM, but leaves 100MB CSVs on Disk.
if content.len() < 500 * 1024 {
cache.insert(path_str, content.clone());
}
return Some(self.build_reply_with_mime(path, content));
}
None
}
/// Sets (adds or updates) a single MIME type mapping in the registry.
///
/// If the extension already exists, the associated MIME type is updated
/// to the new value.
///
/// # Arguments
/// * `ext` - The file extension (e.g., "webp").
/// * `mime` - The corresponding MIME media type (e.g., "image/webp").
///
/// # Example
/// ```
/// use webio::*;
/// let mut app = WebIo::new();
/// app.set_mime("webp", "image/webp");
/// ```
pub fn set_mime(&mut self, ext: &str, mime: &str) {
mime::set_mime_logic(&mut self.mime_types, ext, mime);
}
/// Sets multiple MIME type mappings at once.
///
/// Useful for bulk-updating the registry from a configuration list.
///
/// # Arguments
/// * `types` - A vector of tuples containing `(extension, mime_type)`.
///
/// # Example
/// ```
/// use webio::*;
/// let mut app = WebIo::new();
/// app.set_mimes(vec![("woff2", "font/woff2"), ("wasm", "application/wasm")]);
/// ```
pub fn set_mimes(&mut self, types: Vec<(&str, &str)>) {
mime::set_mimes_logic(&mut self.mime_types, types);
}
/// Removes a single MIME type mapping from the registry.
///
/// If the extension is not found in the registry, the function does nothing.
///
/// # Arguments
/// * `ext` - The file extension to be removed (e.g., "php").
///
/// # Example
/// ```
/// use webio::*;
/// let mut app = WebIo::new();
/// // Disable serving of PHP files
/// app.remove_mime("php");
/// ```
pub fn remove_mime(&mut self, ext: &str) {
mime::remove_mime_logic(&mut self.mime_types, ext);
}
/// Removes multiple MIME type mappings from the registry at once.
///
/// Ideal for disabling entire categories of files (e.g., all video formats).
///
/// # Arguments
/// * `exts` - A vector of file extensions to be removed.
///
/// # Example
/// ```
/// use webio::*;
/// let mut app = WebIo::new();
/// // Disable multiple video formats
/// let to_remove = vec!["mp4", "webm", "avi", "mov"];
/// app.remove_mimes(to_remove);
/// ```
pub fn remove_mimes(&mut self, exts: Vec<&str>) {
mime::remove_mimes_logic(&mut self.mime_types, exts);
}
/// Internal helper to look up MIME types and construct a `Reply` object.
///
/// This method extracts the file extension from the provided `Path`,
/// performs a case-insensitive lookup in the registry, and sets the
/// `Content-Type` header accordingly.
///
/// # Fallback
/// If no extension is found or the extension is unknown, it defaults
/// to `application/octet-stream`.
fn build_reply_with_mime(&self, path: &Path, content: Vec<u8>) -> Reply {
// 1. Extract the extension, convert to string, and handle case-sensitivity
let ext = path.extension()
.and_then(|s| s.to_str())
.map(|s| s.to_lowercase()) // Ensures .JPG and .jpg both work
.unwrap_or_default();
// 2. Dynamic lookup in the HashMap
// If the extension isn't found, it fall back to "application/octet-stream".
let ct = self.mime_types.get(&ext)
.map(|s| s.as_str())
.unwrap_or("application/octet-stream");
// 3. Build and return the Reply
Reply::new(StatusCode::Ok)
.header("Content-Type", ct)
.body(content)
}
/// Registers a global middleware function.
/// If the middleware returns `Some(Reply)`, the request cycle ends early.
pub fn use_mw<F>(&mut self, f: F) where F: Fn(&str) -> Option<Reply> + Send + Sync + 'static {
self.mw = Some(Box::new(f));
}
/// Configures custom 404 handlers.
pub fn on_404<F, Fut>(&mut self, handler: F)
where
F: Fn(Req, Params) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Reply> + Send + 'static,
{
let h: Handler = Box::new(move |r, p| Box::pin(handler(r, p)));
// 1. Create a temporary listener and connect to it to get a valid TcpStream
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind dummy listener");
let addr = listener.local_addr().expect("Failed to get local addr");
let stream = TcpStream::connect(addr).expect("Failed to connect dummy stream");
// 2. Build the dummy Req with the new stream field
let dummy_req = Req {
method: "GET".into(),
path: "/404_sniff".into(),
body: "".into(),
headers: HashMap::new(),
stream, // Added to satisfy the new Req struct
};
// 3. Execute the handler to "sniff" the Content-Type
let sniff = block_on(h(dummy_req, Params(HashMap::new())));
// 4. Categorize based on the headers HashMap
let ct = sniff.headers.get("Content-Type").cloned().unwrap_or_default().to_lowercase();
if ct.contains("json") {
self.handlers_404.insert("json".to_string(), h);
} else {
self.handlers_404.insert("html".to_string(), h);
}
}
// Defines a route with a specific method and path.
/// Supports dynamic segments using `<name>` syntax (e.g., `/user/<id>`).
pub fn route<F, Fut>(&mut self, method: Method, path: &str, handler: F)
where F: Fn(Req, Params) -> Fut + Send + Sync + 'static, Fut: Future<Output = Reply> + Send + 'static,
{
let m = match method {
GET => "GET",
POST => "POST",
PUT => "PUT",
PATCH => "PATCH",
DELETE => "DELETE",
HEAD => "HEAD",
OPTIONS => "OPTIONS",
CONNECT => "CONNECT",
TRACE => "TRACE",
COPY => "COPY",
MOVE => "MOVE",
MKCOL => "MKCOL",
PROPFIND => "PROPFIND",
LOCK => "LOCK",
UNLOCK => "UNLOCK"
}.to_string();
self.routes.push((m, path.to_string(), Box::new(move |r, p| Box::pin(handler(r, p)))));
}
/// Helper to determine the banner content.
///
/// If `text` is empty, it returns the default framework message:
/// `"🦅 WebIO Live:"`. Otherwise, it returns the user-provided string.
///
/// # Arguments
///
/// * `text` - A string slice that holds the custom banner message.
fn rename_banner(text: &str) -> String {
if text.is_empty() {
"🦅 WebIO Live:".to_string()
} else {
format!("{}", text)
}
}
/// Initializes the Multi-Threaded TCP Listener and enters the primary execution loop.
///
/// This method serves as the entry point for the **Go-inspired concurrency model**,
/// binding the engine to a specific network interface and delegating every incoming
/// connection to a dedicated OS thread. This architecture ensures pre-emptive
/// multitasking, where computational-heavy tasks (e.g., matrix calculations)
/// cannot block the responsiveness of other network participants.
///
/// ### Reliability & Safety:
/// * **The Safety Valve (Thread Guard):** Implements an `AtomicUsize` concurrency
/// throttle to prevent system resource exhaustion. If the active connection
/// count exceeds `max_threads` (2,000), subsequent connections are gracefully
/// discarded to maintain the stability of the host environment.
/// * **Atomic Lifecycle Management:** Utilizes `Ordering::SeqCst` for deterministic
/// tracking of thread entry and exit, ensuring the global state is consistent
/// across all CPU cores.
/// * **Nagle Optimization:** Dynamically applies the `TCP_NODELAY` configuration
/// per-socket, allowing the engine to be tuned for either low-latency
/// interactive APIs or high-throughput **Big Data** ingestion.
pub fn run(self, host: &str, port: &str) {
let listener = TcpListener::bind(format!("{}:{}", host, port)).expect("Bind failed");
let banner = Self::rename_banner(&self.banner_text);
let app = Arc::new(self);
// --- SAFETY VALVE: Thread Limit ---
// Prevents "Thread-Bombing" attacks and protects OS kernel stability.
let active_threads = Arc::new(AtomicUsize::new(0));
let max_threads = 2000; // Typical safe limit for OS threads
// Displays the server startup banner and the local access URL.
// This is the primary visual confirmation that the multi-threaded engine is live.
println!("{} http://{}:{}", banner, host, port);
for stream in listener.incoming() {
if let Ok(s) = stream {
let a = Arc::clone(&app);
let counter = Arc::clone(&active_threads);
// Deterministic Concurrency Check
if counter.load(Ordering::SeqCst) < max_threads {
counter.fetch_add(1, Ordering::SeqCst);
// Toggle Nagle's Algorithm (TCP_NODELAY) based on engine config
let _ = s.set_nodelay(!app.nagle_enabled);
// OS-Level Isolation:
// Move the connection to a dedicated thread for pre-emptive multitasking.
thread::spawn(move || {
a.handle_connection(s);
// Atomic Cleanup:
// Decrement the counter regardless of connection outcome.
counter.fetch_sub(1, Ordering::SeqCst);
});
} else {
// Resource Exhaustion Policy: Drop connection to preserve core stability.
println!("⚠️ Server Busy: Max threads reached!");
}
}
}
}
// --- WORKER IMPLEMENTATION ---
/// Internal worker function responsible for the full HTTP request-response lifecycle
/// within an isolated OS thread.
///
/// This function acts as the **Synchronous-to-Asynchronous Bridge**, utilizing
/// the `block_on` executor to drive asynchronous handlers while ensuring
/// computational isolation. It manages raw buffer parsing, metadata extraction,
/// RAM security enforcement, and dynamic route dispatching.
///
/// ### Protocol Execution Phases:
/// 1. **Initial Ingestion:** Reads the first 4KB of data from the socket to
/// identify the HTTP verb, path, and header block.
/// 2. **RAM Safety Guard:** Inspects the `Content-Length` header pre-emptively.
/// If the payload exceeds 10MB, the connection is terminated with a
/// `403 Forbidden` to protect the system's heap memory.
/// 3. **Deterministic Body Extraction:** Uses `read_exact` to guarantee body
/// integrity, ensuring the framework remains resilient against network jitter
/// during high-concurrency **Big Data** ingestion.
/// 4. **Dynamic Segment Routing:** Performs O(N) path analysis to extract
/// `<name>` parameters, populating the `Params` collection for the handler.
/// 5. **Async Future Resolution:** Bridges into the async world via `block_on`,
/// driving the route handler or static asset delivery to completion.
///
/// ### Thread Isolation:
/// By running this worker in a dedicated OS thread, WebIO ensures that if a
/// handler enters a long-running mathematical loop, it does not impede
/// the I/O processing or `spin_wait` cycles of other active connections.
fn handle_connection(&self, mut stream: TcpStream) {
let start_time = Instant::now();
// --- 1. TCP OPTIMIZATION ---
// Dynamically toggle Nagle's algorithm based on engine settings.
// set_nagle(false) -> Low Latency (Best for small JSON/HTML, PWA)
// set_nagle(true) -> High Throughput (Best for Big Data CSVs)
let _ = stream.set_nodelay(!self.nagle_enabled);
let _ = stream.set_read_timeout(Some(std::time::Duration::from_millis(150)));
// --- 2. HEADER INGESTION ---
let mut buffer = [0; 4096];
let n = match stream.read(&mut buffer) { Ok(n) if n > 0 => n, _ => return };
let header_str = String::from_utf8_lossy(&buffer[..n]);
let mut lines = header_str.lines();
let first_line = lines.next().unwrap_or("");
let parts: Vec<&str> = first_line.split_whitespace().collect();
if parts.len() < 2 { return; }
let (method, full_path) = (parts[0], parts[1]);
// Parse key-value metadata into a lowercase-indexed HashMap.
let mut headers = HashMap::new();
for line in lines {
if line.is_empty() { break; }
if let Some((k, v)) = line.split_once(": ") {
headers.insert(k.to_lowercase(), v.to_string());
}
}
// --- 3. RAM SAFETY GUARD ---
let max_body_size = 10 * 1024 * 1024; // 10MB Threshold
let content_length = headers.get("content-length")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(0);
// Pre-emptive rejection to prevent heap exhaustion.
if content_length > max_body_size {
let too_large = Reply::new(StatusCode::Forbidden).body("Payload Too Large (Max 10MB)");
block_on(self.finalize(stream, too_large, method, full_path, start_time));
return;
}
// --- 4. SMART PAYLOAD EXTRACTION ---
// Protocol-accurate read to handle data spanning across multiple TCP packets.
let body = if let Some(pos) = header_str.find("\r\n\r\n") {
let initial_body = &header_str[pos + 4..];
if initial_body.len() >= content_length {
// Case A: Small body already inside the 4KB buffer
initial_body[..content_length].to_string()
} else {
// Case B: Larger body requires further reading from the socket
let mut remaining = vec![0u8; content_length - initial_body.len()];
let _ = stream.read_exact(&mut remaining);
format!("{}{}", initial_body, String::from_utf8_lossy(&remaining))
}
} else {
String::new()
};
// --- 5. MIDDLEWARE EXECUTION ---
if let Some(ref mw_func) = self.mw {
if let Some(early_reply) = mw_func(full_path) {
block_on(self.finalize(stream, early_reply, method, full_path, start_time));
return;
}
}
// --- 6. ROUTING & DISPATCH ---
let path_only = full_path.split('?').next().unwrap_or("/");
let mut final_params = HashMap::new();
let mut active_handler: Option<&Handler> = None;
let path_segments: Vec<&str> = path_only.split('/').filter(|s| !s.is_empty()).collect();
for (r_method, r_path, handler) in &self.routes {
if r_method != method { continue; }
let route_segments: Vec<&str> = r_path.split('/').filter(|s| !s.is_empty()).collect();
if route_segments.len() == path_segments.len() {
let mut matches = true;
let mut temp_params = HashMap::new();
for (r_seg, p_seg) in route_segments.iter().zip(path_segments.iter()) {
if r_seg.starts_with('<') && r_seg.ends_with('>') {
temp_params.insert(r_seg[1..r_seg.len()-1].to_string(), p_seg.to_string());
} else if r_seg != p_seg { matches = false; break; }
}
if matches { final_params = temp_params; active_handler = Some(handler); break; }
}
}
// --- 7. REQUEST OBJECTIFICATION ---
let req = Req {
method: method.to_string(),
path: full_path.to_string(),
body,
headers,
// Allows handlers to stream GBs of data directly to disk/DB
// Pass cloned TcpStream to allow raw Big Data streaming in the handler.
stream: stream.try_clone().expect("CRITICAL: Socket clone failed"),
};
// --- 8. RESPONSE GENERATION ---
let reply = if let Some(handler) = active_handler {
block_on(handler(req, Params(final_params)))
} else if let Some(static_reply) = block_on(self.serve_static(path_only)) {
static_reply
} else {
// Content-Type Aware Smart 404
let accept = req.headers.get("accept").cloned().unwrap_or_default();
let h_404 = if accept.contains("text/html") {
self.handlers_404.get("html")
} else {
self.handlers_404.get("json")
};
if let Some(h) = h_404 {
block_on(h(req, Params(HashMap::new())))
} else {
Reply::new(StatusCode::NotFound).body("404 Not Found")
}
};
// --- PROTOCOL FINALIZATION ---
block_on(self.finalize(stream, reply, method, full_path, start_time));
}
// --- PROTOCOL FINALIZATION & SERIALIZATION ---
/// Finalizes the HTTP response by serializing metadata and streaming body fragments
/// directly to the network interface.
///
/// This method implements the **HTTP/1.1 Chunked Transfer Encoding** protocol,
/// providing a highly efficient mechanism for delivering large datasets (Big Data)
/// without requiring prior knowledge of the total payload size.
///
/// ### Performance Architecture:
/// * **Syscall Minimization:** Utilizes a high-capacity (64KB) `BufWriter` to
/// batch headers and initial data fragments. This minimizes the "Context Switch Tax"
/// between user-space and the OS kernel, frequently enabling **50µs - 150µs**
/// response times in local environments.
/// * **Memory Safety:** By iterating through the `BodyStream` trait, the engine
/// moves data in discrete chunks, preventing the allocation of massive byte
/// vectors on the heap and ensuring a stable RAM profile regardless of file size.
/// * **Low-Latency Finalization:** Employs `TCP_NODELAY` (when configured) to
/// bypass Nagle's algorithm, ensuring that small metadata headers are transmitted
/// instantly rather than being buffered by the network stack.
/// * **Zero-Copy Serialization:** Response headers are pushed directly into the
/// buffered writer, reducing intermediate string allocations during the
/// serialization lifecycle.
///
/// ### Diagnostic & Telemetry:
/// Upon completion, the method triggers high-resolution telemetry logging,
/// capturing the precise duration from the initial `TcpStream` acceptance
/// to the final byte flush. Immediate socket shutdown follows to prevent
/// "hanging" connections and optimize resource reclamation.
async fn finalize(&self, stream: TcpStream, reply: Reply, method: &str, path: &str, start: Instant) {
{
// Optimization: 64KB Buffer ensures headers and first chunks
// occupy a single MTU packet for maximum throughput.
let mut writer = BufWriter::with_capacity(65536, &stream);
// Construct the Status Line and Protocol Headers.
// Transfer-Encoding: chunked is the foundation of WebIO's streaming engine.
let mut head = format!(
"HTTP/1.1 {} OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n", reply.status
);
for (k, v) in &reply.headers {
head.push_str(&format!("{}: {}\r\n", k, v));
}
head.push_str("\r\n");
let _ = writer.write_all(head.as_bytes());
// --- BIG DATA STREAMING LOOP ---
// Consumes chunks from the BodyStream until exhaustion.
let mut b = reply.body;
while let Some(data) = b.next_chunk() {
// Protocol Format: [Hex Size]\r\n[Payload]\r\n
let _ = writer.write_all(format!("{:X}\r\n", data.len()).as_bytes());
let _ = writer.write_all(&data);
let _ = writer.write_all(b"\r\n");
}
// Termination: RFC-standard zero-length chunk
let _ = writer.write_all(b"0\r\n\r\n");
let _ = writer.flush();
}
// Final Telemetry: High-resolution timestamping
self.log_reply(method, path, reply.status, start, self.log_reply_enabled);
// Resource Reclamation: Force OS-level socket closure
let _ = stream.shutdown(Shutdown::Both);
}
}