#![doc = include_str!("../README.md")]
use std::{
collections::HashMap,
future::Future,
hint,
io::{Read, Write, BufWriter},
net::{TcpListener, TcpStream, Shutdown},
pin::{Pin, pin},
sync::{Arc, Mutex, LazyLock, RwLock},
task::{Context, Poll, Waker},
time::{Instant, SystemTime, UNIX_EPOCH},
thread
};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
pub trait BodyStream: Send {
fn next_chunk(&mut self) -> Option<Vec<u8>>;
}
impl BodyStream for Vec<u8> {
fn next_chunk(&mut self) -> Option<Vec<u8>> {
if self.is_empty() {
None
} else {
Some(std::mem::take(self))
}
}
}
pub trait IntoBytes {
fn into_bytes(self) -> Vec<u8>;
}
impl IntoBytes for String {
fn into_bytes(self) -> Vec<u8> { self.into_bytes() }
}
impl IntoBytes for &str {
fn into_bytes(self) -> Vec<u8> { self.as_bytes().to_vec() }
}
impl IntoBytes for Vec<u8> {
fn into_bytes(self) -> Vec<u8> { self }
}
pub struct Req {
pub method: String,
pub path: String,
pub body: String,
pub headers: HashMap<String, String>,
pub stream: TcpStream,
}
impl Req {
pub fn form(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
for pair in self.body.split('&') {
let mut kv = pair.split('=');
if let (Some(k), Some(v)) = (kv.next(), kv.next()) {
let decoded_v = v.replace('+', " ");
map.insert(k.to_string(), decoded_v);
}
}
map
}
pub fn save_to_file(&mut self, path: &str) -> std::io::Result<u64> {
let file = std::fs::File::create(path)?;
let mut writer = BufWriter::new(file);
let mut buffer = [0; 65536]; let mut total = 0;
while let Ok(n) = self.stream.read(&mut buffer) {
if n == 0 { break; }
writer.write_all(&buffer[..n])?;
total += n as u64;
}
writer.flush()?;
Ok(total)
}
pub fn upgrade_websocket(&self) -> Option<TcpStream> {
let key = self.headers.get("sec-websocket-key")?;
let accept_key = derive_websocket_accept(key);
let response = format!(
"HTTP/1.1 101 Switching Protocols\r\n\
Upgrade: websocket\r\n\
Connection: Upgrade\r\n\
Sec-WebSocket-Accept: {}\r\n\r\n",
accept_key
);
let mut s = self.stream.try_clone().ok()?;
let _ = s.write_all(response.as_bytes());
Some(s)
}
}
struct Sha1 {
state: [u32; 5],
buffer: Vec<u8>,
count: u64,
}
impl Sha1 {
fn new() -> Self {
Self {
state: [0x67452301, 0xEFCDAB89, 0x98BADCFE, 0x10325476, 0xC3D2E1F0],
buffer: Vec::new(),
count: 0,
}
}
fn update(&mut self, data: &[u8]) {
self.buffer.extend_from_slice(data);
self.count += (data.len() * 8) as u64;
}
fn finalize(mut self) -> [u8; 20] {
self.buffer.push(0x80);
while (self.buffer.len() % 64) != 56 { self.buffer.push(0); }
self.buffer.extend_from_slice(&self.count.to_be_bytes());
for chunk in self.buffer.chunks_exact(64) {
let mut w = [0u32; 80];
for i in 0..16 {
w[i] = u32::from_be_bytes([chunk[i*4], chunk[i*4+1], chunk[i*4+2], chunk[i*4+3]]);
}
for i in 16..80 {
w[i] = (w[i-3] ^ w[i-8] ^ w[i-14] ^ w[i-16]).rotate_left(1);
}
let [mut a, mut b, mut c, mut d, mut e] = self.state;
for i in 0..80 {
let (f, k) = match i {
0..=19 => ((b & c) | (!b & d), 0x5A827999),
20..=39 => (b ^ c ^ d, 0x6ED9EBA1),
40..=59 => ((b & c) | (b & d) | (c & d), 0x8F1BBCDC),
_ => (b ^ c ^ d, 0xCA62C1D6),
};
let temp = a.rotate_left(5).wrapping_add(f).wrapping_add(e).wrapping_add(k).wrapping_add(w[i]);
e = d; d = c; c = b.rotate_left(30); b = a; a = temp;
}
self.state[0] = self.state[0].wrapping_add(a);
self.state[1] = self.state[1].wrapping_add(b);
self.state[2] = self.state[2].wrapping_add(c);
self.state[3] = self.state[3].wrapping_add(d);
self.state[4] = self.state[4].wrapping_add(e);
}
let mut out = [0u8; 20];
for i in 0..5 { out[i*4..(i+1)*4].copy_from_slice(&self.state[i].to_be_bytes()); }
out
}
}
fn base64_encode(input: &[u8]) -> String {
const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut result = String::with_capacity((input.len() + 2) / 3 * 4);
for chunk in input.chunks(3) {
let b = match chunk.len() {
3 => (chunk[0] as u32) << 16 | (chunk[1] as u32) << 8 | (chunk[2] as u32),
2 => (chunk[0] as u32) << 16 | (chunk[1] as u32) << 8,
_ => (chunk[0] as u32) << 16,
};
result.push(CHARSET[(b >> 18 & 0x3F) as usize] as char);
result.push(CHARSET[(b >> 12 & 0x3F) as usize] as char);
if chunk.len() > 1 { result.push(CHARSET[(b >> 6 & 0x3F) as usize] as char); } else { result.push('='); }
if chunk.len() > 2 { result.push(CHARSET[(b & 0x3F) as usize] as char); } else { result.push('='); }
}
result
}
pub struct WSFrame {
pub payload: Vec<u8>,
}
impl WSFrame {
pub fn read(stream: &mut TcpStream) -> std::io::Result<Self> {
let mut head = [0u8; 2];
stream.read_exact(&mut head)?;
let is_masked = (head[1] & 0x80) != 0;
let mut len = (head[1] & 0x7F) as usize;
if len == 126 {
let mut extended_len = [0u8; 2];
stream.read_exact(&mut extended_len)?;
len = u16::from_be_bytes(extended_len) as usize;
} else if len == 127 {
let mut extended_len = [0u8; 8];
stream.read_exact(&mut extended_len)?;
len = u64::from_be_bytes(extended_len) as usize;
}
let mut payload = vec![0u8; len];
if is_masked {
let mut mask = [0u8; 4];
stream.read_exact(&mut mask)?;
stream.read_exact(&mut payload)?;
for i in 0..len {
payload[i] ^= mask[i % 4];
}
} else {
stream.read_exact(&mut payload)?;
}
Ok(WSFrame { payload })
}
}
pub struct Params(pub HashMap<String, String>);
#[derive(Copy, Clone)]
#[repr(u16)]
pub enum StatusCode {
Ok = 200,
Unauthorized = 401,
Forbidden = 403,
NotFound = 404,
InternalError = 500,
}
pub struct Reply {
pub status: u16,
pub headers: HashMap<String, String>,
pub body: Box<dyn BodyStream>,
}
impl Reply {
pub fn new(status: StatusCode) -> Self {
Self { status: status as u16, headers: HashMap::new(), body: Box::new(Vec::new()) }
}
pub fn header(mut self, key: &str, value: &str) -> Self {
self.headers.insert(key.to_string(), value.to_string());
self
}
pub fn body<T: IntoBytes>(mut self, data: T) -> Self {
self.body = Box::new(data.into_bytes());
self
}
}
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub type Handler = Box<dyn Fn(Req, Params) -> BoxFuture<'static, Reply> + Send + Sync>;
pub type Middleware = Box<dyn Fn(&str) -> Option<Reply> + Send + Sync>;
pub enum Method { GET, POST }
pub use Method::*;
pub struct WebIo {
routes: Vec<(String, String, Handler)>,
mw: Option<Middleware>,
handlers_404: HashMap<String, Handler>,
static_dir: String,
pub log_reply_enabled: bool,
pub nagle_enabled: bool,
pub clients: Arc<Mutex<Vec<TcpStream>>>,
pub cache: Arc<RwLock<HashMap<String, Vec<u8>>>>,
}
impl WebIo {
pub fn new() -> Self {
Self {
routes: Vec::new(),
mw: None,
handlers_404: HashMap::new() ,
static_dir: "assets".to_string(), log_reply_enabled: false, nagle_enabled: true, clients: Arc::new(Mutex::new(Vec::new())),
cache: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn broadcast(&self, message: &str) {
let mut clients = self.clients.lock().expect("Registry Lock Poisoned");
let frame = encode_ws_frame(message);
clients.retain_mut(|client| {
client.write_all(&frame).is_ok()
});
}
pub fn set_nagle(mut self, enabled: bool) -> Self {
self.nagle_enabled = enabled;
self
}
fn log_reply(&self, method: &str, path: &str, status: u16, start: Instant, should_log: bool) {
if !should_log {
return;
}
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
println!(
"[{:02}:{:02}:{:02}] {} {} -> {} ({:?})",
(now/3600)%24, (now/60)%60, now%60,
method, path, status, start.elapsed() );
}
pub fn use_static(&mut self, path: &str) {
self.static_dir = path.to_string();
}
async fn serve_static(&self, path: &str) -> Option<Reply> {
let relative_path = path.trim_start_matches('/');
let base_path = PathBuf::from(&self.static_dir);
let target_path = base_path.join(relative_path);
if target_path.exists() && target_path.is_file() {
return self.create_file_reply(&target_path);
}
if relative_path == "favicon.ico" {
if let Some(found_path) = find_file_recursive(&base_path, "favicon.ico") {
return self.create_file_reply(&found_path);
}
}
None
}
fn create_file_reply(&self, path: &Path) -> Option<Reply> {
let path_str = path.to_string_lossy().to_string();
{
let cache = self.cache.read().unwrap();
if let Some(content) = cache.get(&path_str) {
return Some(self.build_reply_with_mime(path, content.clone()));
}
}
if let Ok(content) = std::fs::read(path) {
let mut cache = self.cache.write().unwrap();
if content.len() < 500 * 1024 {
cache.insert(path_str, content.clone());
}
return Some(self.build_reply_with_mime(path, content));
}
None
}
fn build_reply_with_mime(&self, path: &Path, content: Vec<u8>) -> Reply {
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
let ct = match ext {
"ico" => "image/x-icon",
"css" => "text/css",
"js" => "application/javascript",
"svg" => "image/svg+xml",
"png" => "image/png",
"jpg" | "jpeg" => "image/jpeg",
"gif" => "image/gif",
"mp4" => "video/mp4",
_ => "application/octet-stream",
};
Reply::new(StatusCode::Ok)
.header("Content-Type", ct)
.body(content)
}
pub fn use_mw<F>(&mut self, f: F) where F: Fn(&str) -> Option<Reply> + Send + Sync + 'static {
self.mw = Some(Box::new(f));
}
pub fn on_404<F, Fut>(&mut self, handler: F)
where
F: Fn(Req, Params) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Reply> + Send + 'static,
{
let h: Handler = Box::new(move |r, p| Box::pin(handler(r, p)));
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind dummy listener");
let addr = listener.local_addr().expect("Failed to get local addr");
let stream = TcpStream::connect(addr).expect("Failed to connect dummy stream");
let dummy_req = Req {
method: "GET".into(),
path: "/404_sniff".into(),
body: "".into(),
headers: HashMap::new(),
stream, };
let sniff = block_on(h(dummy_req, Params(HashMap::new())));
let ct = sniff.headers.get("Content-Type").cloned().unwrap_or_default().to_lowercase();
if ct.contains("json") {
self.handlers_404.insert("json".to_string(), h);
} else {
self.handlers_404.insert("html".to_string(), h);
}
}
pub fn route<F, Fut>(&mut self, method: Method, path: &str, handler: F)
where F: Fn(Req, Params) -> Fut + Send + Sync + 'static, Fut: Future<Output = Reply> + Send + 'static,
{
let m = match method { GET => "GET", POST => "POST" }.to_string();
self.routes.push((m, path.to_string(), Box::new(move |r, p| Box::pin(handler(r, p)))));
}
pub fn run(self, host: &str, port: &str) {
let listener = TcpListener::bind(format!("{}:{}", host, port)).expect("Bind failed");
let app = Arc::new(self);
let active_threads = Arc::new(AtomicUsize::new(0));
let max_threads = 2000;
println!("š¦
WebIO Multi-Threaded Engine Live: http://{}:{}", host, port);
for stream in listener.incoming() {
if let Ok(s) = stream {
let a = Arc::clone(&app);
let counter = Arc::clone(&active_threads);
if counter.load(Ordering::SeqCst) < max_threads {
counter.fetch_add(1, Ordering::SeqCst);
let _ = s.set_nodelay(!app.nagle_enabled);
thread::spawn(move || {
a.handle_connection(s);
counter.fetch_sub(1, Ordering::SeqCst);
});
} else {
println!("ā ļø Server Busy: Max threads reached!");
}
}
}
}
fn handle_connection(&self, mut stream: TcpStream) {
let start_time = Instant::now();
let _ = stream.set_nodelay(!self.nagle_enabled);
let _ = stream.set_read_timeout(Some(std::time::Duration::from_millis(150)));
let mut buffer = [0; 4096];
let n = match stream.read(&mut buffer) { Ok(n) if n > 0 => n, _ => return };
let header_str = String::from_utf8_lossy(&buffer[..n]);
let mut lines = header_str.lines();
let first_line = lines.next().unwrap_or("");
let parts: Vec<&str> = first_line.split_whitespace().collect();
if parts.len() < 2 { return; }
let (method, full_path) = (parts[0], parts[1]);
let mut headers = HashMap::new();
for line in lines {
if line.is_empty() { break; }
if let Some((k, v)) = line.split_once(": ") {
headers.insert(k.to_lowercase(), v.to_string());
}
}
let max_body_size = 10 * 1024 * 1024; let content_length = headers.get("content-length")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(0);
if content_length > max_body_size {
let too_large = Reply::new(StatusCode::Forbidden).body("Payload Too Large (Max 10MB)");
block_on(self.finalize(stream, too_large, method, full_path, start_time));
return;
}
let body = if let Some(pos) = header_str.find("\r\n\r\n") {
let initial_body = &header_str[pos + 4..];
if initial_body.len() >= content_length {
initial_body[..content_length].to_string()
} else {
let mut remaining = vec![0u8; content_length - initial_body.len()];
let _ = stream.read_exact(&mut remaining);
format!("{}{}", initial_body, String::from_utf8_lossy(&remaining))
}
} else {
String::new()
};
if let Some(ref mw_func) = self.mw {
if let Some(early_reply) = mw_func(full_path) {
block_on(self.finalize(stream, early_reply, method, full_path, start_time));
return;
}
}
let path_only = full_path.split('?').next().unwrap_or("/");
let mut final_params = HashMap::new();
let mut active_handler: Option<&Handler> = None;
let path_segments: Vec<&str> = path_only.split('/').filter(|s| !s.is_empty()).collect();
for (r_method, r_path, handler) in &self.routes {
if r_method != method { continue; }
let route_segments: Vec<&str> = r_path.split('/').filter(|s| !s.is_empty()).collect();
if route_segments.len() == path_segments.len() {
let mut matches = true;
let mut temp_params = HashMap::new();
for (r_seg, p_seg) in route_segments.iter().zip(path_segments.iter()) {
if r_seg.starts_with('<') && r_seg.ends_with('>') {
temp_params.insert(r_seg[1..r_seg.len()-1].to_string(), p_seg.to_string());
} else if r_seg != p_seg { matches = false; break; }
}
if matches { final_params = temp_params; active_handler = Some(handler); break; }
}
}
let req = Req {
method: method.to_string(),
path: full_path.to_string(),
body,
headers,
stream: stream.try_clone().expect("CRITICAL: Socket clone failed"),
};
let reply = if let Some(handler) = active_handler {
block_on(handler(req, Params(final_params)))
} else if let Some(static_reply) = block_on(self.serve_static(path_only)) {
static_reply
} else {
let accept = req.headers.get("accept").cloned().unwrap_or_default();
let h_404 = if accept.contains("text/html") {
self.handlers_404.get("html")
} else {
self.handlers_404.get("json")
};
if let Some(h) = h_404 {
block_on(h(req, Params(HashMap::new())))
} else {
Reply::new(StatusCode::NotFound).body("404 Not Found")
}
};
block_on(self.finalize(stream, reply, method, full_path, start_time));
}
async fn finalize(&self, stream: TcpStream, reply: Reply, method: &str, path: &str, start: Instant) {
{
let mut writer = BufWriter::with_capacity(65536, &stream);
let mut head = format!(
"HTTP/1.1 {} OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n", reply.status
);
for (k, v) in &reply.headers {
head.push_str(&format!("{}: {}\r\n", k, v));
}
head.push_str("\r\n");
let _ = writer.write_all(head.as_bytes());
let mut b = reply.body;
while let Some(data) = b.next_chunk() {
let _ = writer.write_all(format!("{:X}\r\n", data.len()).as_bytes());
let _ = writer.write_all(&data);
let _ = writer.write_all(b"\r\n");
}
let _ = writer.write_all(b"0\r\n\r\n");
let _ = writer.flush();
}
self.log_reply(method, path, reply.status, start, self.log_reply_enabled);
let _ = stream.shutdown(Shutdown::Both);
}
}
pub fn block_on<F: Future>(future: F) -> F::Output {
let mut future = pin!(future);
let waker = Waker::noop();
let mut cx = Context::from_waker(waker);
let mut spins = 0u64;
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => {
if spins < 150_000 {
hint::spin_loop();
spins += 1;
} else {
thread::yield_now();
spins = 0;
}
}
}
}
}
fn find_file_recursive(dir: &std::path::Path, filename: &str) -> Option<std::path::PathBuf> {
if !dir.is_dir() { return None; }
let current_check = dir.join(filename);
if current_check.exists() { return Some(current_check); }
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(found) = find_file_recursive(&path, filename) {
return Some(found);
}
}
}
}
None
}
fn derive_websocket_accept(key: &str) -> String {
let mut hasher = Sha1::new();
hasher.update(key.as_bytes());
hasher.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
base64_encode(&hasher.finalize())
}
fn encode_ws_frame(message: &str) -> Vec<u8> {
let payload = message.as_bytes();
let len = payload.len();
let mut frame = Vec::new();
frame.push(0x81);
if len <= 125 {
frame.push(len as u8);
} else if len <= 65535 {
frame.push(126);
frame.extend_from_slice(&(len as u16).to_be_bytes());
} else {
frame.push(127);
frame.extend_from_slice(&(len as u64).to_be_bytes());
}
frame.extend_from_slice(payload);
frame
}
pub static CLIENTS: LazyLock<Mutex<Vec<TcpStream>>> = LazyLock::new(|| {
Mutex::new(Vec::new())
});
pub fn broadcast(message: &str) {
if let Ok(mut clients) = CLIENTS.lock() {
let frame = encode_ws_frame(message);
clients.retain_mut(|client| {
client.write_all(&frame).is_ok()
});
}
}