Skip to main content

vapor_http/
lib.rs

1mod scheduler;
2
3pub use scheduler::{DynamicThreadPool, RpsTracker};
4
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::net::{TcpListener, TcpStream};
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::sync::Semaphore;
10use tracing::{info, warn, error};
11
12pub type HandlerFn = Arc<dyn Fn(Request) -> Response + Send + Sync>;
13
14#[derive(Debug, Clone)]
15pub struct Request {
16    pub method: String,
17    pub path: String,
18    pub headers: Vec<(String, String)>,
19    pub body: Option<String>,
20}
21
22#[derive(Debug, Clone)]
23pub struct Response {
24    pub status: u16,
25    pub headers: Vec<(String, String)>,
26    pub body: String,
27}
28
29impl Response {
30    pub fn ok(body: impl Into<String>) -> Self {
31        Self {
32            status: 200,
33            headers: vec![("Content-Type".to_string(), "text/plain".to_string())],
34            body: body.into(),
35        }
36    }
37
38    pub fn json(body: impl Into<String>) -> Self {
39        Self {
40            status: 200,
41            headers: vec![("Content-Type".to_string(), "application/json".to_string())],
42            body: body.into(),
43        }
44    }
45
46    pub fn status(code: u16) -> Self {
47        Self {
48            status: code,
49            headers: vec![],
50            body: String::new(),
51        }
52    }
53
54    pub fn not_found() -> Self {
55        Self {
56            status: 404,
57            headers: vec![],
58            body: "Not Found".to_string(),
59        }
60    }
61
62    pub fn internal_error(msg: impl Into<String>) -> Self {
63        Self {
64            status: 500,
65            headers: vec![],
66            body: msg.into(),
67        }
68    }
69
70    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
71        self.headers.push((key.into(), value.into()));
72        self
73    }
74
75    pub fn body(mut self, body: impl Into<String>) -> Self {
76        self.body = body.into();
77        self
78    }
79}
80
81pub struct Vapor {
82    pool: Arc<DynamicThreadPool>,
83    rps_tracker: Arc<RpsTracker>,
84    handler: HandlerFn,
85    port: u16,
86}
87
88impl Vapor {
89    pub fn new(handler: HandlerFn) -> Self {
90        Self {
91            pool: Arc::new(DynamicThreadPool::new()),
92            rps_tracker: Arc::new(RpsTracker::new(1000)),
93            handler,
94            port: 8080,
95        }
96    }
97
98    pub fn port(mut self, port: u16) -> Self {
99        self.port = port;
100        self
101    }
102
103    pub fn pool(mut self, pool: DynamicThreadPool) -> Self {
104        self.pool = Arc::new(pool);
105        self
106    }
107
108    pub fn rps_tracker(&self) -> &RpsTracker {
109        &self.rps_tracker
110    }
111
112    pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
113        let addr = format!("0.0.0.0:{}", self.port);
114        let listener = TcpListener::bind(&addr).await?;
115        let listener = Arc::new(listener);
116        
117        info!("Vapor starting on {} with {} threads", addr, self.pool.current_threads());
118        
119        let pool = self.pool.clone();
120        let rps_tracker = self.rps_tracker.clone();
121        let handler = self.handler.clone();
122        
123        let sem = Arc::new(Semaphore::new(pool.current_threads()));
124        let sem_for_scale = sem.clone();
125        
126        tokio::spawn(async move {
127            loop {
128                if let Some(scale_up) = pool.should_scale() {
129                    let current = pool.current_threads();
130                    if scale_up {
131                        let new_count = (current + 1).min(pool.max_threads());
132                        pool.set_thread_count(new_count);
133                        let additional = new_count.saturating_sub(sem_for_scale.available_permits());
134                        if additional > 0 {
135                            sem_for_scale.add_permits(additional);
136                        }
137                        info!("Scaling up: {} -> {} threads", current, new_count);
138                    } else {
139                        let new_count = (current - 1).max(pool.min_threads());
140                        pool.set_thread_count(new_count);
141                    }
142                }
143                tokio::time::sleep(Duration::from_millis(100)).await;
144            }
145        });
146
147        let sem2 = sem.clone();
148        let listener2 = listener.clone();
149        tokio::spawn(async move {
150            loop {
151                let sem_clone = sem2.clone();
152                let permit = match sem_clone.acquire().await {
153                    Ok(p) => p,
154                    Err(_) => continue,
155                };
156                
157                match listener2.accept().await {
158                    Ok((stream, _)) => {
159                        rps_tracker.record();
160                        let h = handler.clone();
161                        if let Err(e) = handle_connection(stream, h).await {
162                            warn!("Connection error: {}", e);
163                        }
164                        drop(permit);
165                    }
166                    Err(e) => {
167                        drop(permit);
168                        error!("Accept error: {}", e);
169                    }
170                }
171            }
172        });
173
174        use std::future::pending;
175        
176        pending::<()>().await;
177        unreachable!();
178    }
179}
180
181async fn handle_connection(
182    mut stream: TcpStream,
183    handler: HandlerFn,
184) -> Result<(), Box<dyn std::error::Error>> {
185    let mut buffer = vec![0u8; 8192];
186    let n = stream.read(&mut buffer).await?;
187    if n == 0 {
188        return Ok(());
189    }
190
191    let request_str = String::from_utf8_lossy(&buffer[..n]);
192    let request = parse_request(&request_str);
193    let response = handler(request);
194
195    let response_bytes = build_response(&response);
196    stream.write_all(&response_bytes).await?;
197    stream.flush().await?;
198
199    Ok(())
200}
201
202fn parse_request(request_str: &str) -> Request {
203    let lines: Vec<&str> = request_str.lines().collect();
204    if lines.is_empty() {
205        return Request {
206            method: "GET".to_string(),
207            path: "/".to_string(),
208            headers: vec![],
209            body: None,
210        };
211    }
212
213    let first_line: Vec<&str> = lines[0].split_whitespace().collect();
214    let method = first_line.get(0).unwrap_or(&"GET").to_string();
215    let path = first_line.get(1).unwrap_or(&"/").to_string();
216
217    let mut headers = vec![];
218    let mut body_start = 0;
219    
220    for (i, line) in lines.iter().enumerate().skip(1) {
221        if line.is_empty() {
222            body_start = i + 1;
223            break;
224        }
225        if let Some((key, value)) = line.split_once(':') {
226            headers.push((key.trim().to_string(), value.trim().to_string()));
227        }
228    }
229
230    let body = if body_start > 0 && body_start < lines.len() {
231        Some(lines[body_start..].join("\n"))
232    } else {
233        None
234    };
235
236    Request { method, path, headers, body }
237}
238
239fn build_response(response: &Response) -> Vec<u8> {
240    let status_text = match response.status {
241        200 => "OK",
242        404 => "Not Found",
243        500 => "Internal Server Error",
244        _ => "Unknown",
245    };
246
247    let mut response_str = format!(
248        "HTTP/1.1 {} {}\r\n",
249        response.status, status_text
250    );
251
252    for (key, value) in &response.headers {
253        response_str.push_str(&format!("{}: {}\r\n", key, value));
254    }
255
256    response_str.push_str(&format!("Content-Length: {}\r\n", response.body.len()));
257    response_str.push_str("\r\n");
258    response_str.push_str(&response.body);
259
260    response_str.into_bytes()
261}
262
263pub async fn shutdown() {
264    tokio::signal::ctrl_c().await.ok();
265}