1mod scheduler;
2
3pub use scheduler::{DynamicThreadPool, RpsTracker};
4
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::net::{TcpListener, TcpStream};
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::sync::Semaphore;
10use tracing::{info, warn, error};
11
12pub type HandlerFn = Arc<dyn Fn(Request) -> Response + Send + Sync>;
13
14#[derive(Debug, Clone)]
15pub struct Request {
16 pub method: String,
17 pub path: String,
18 pub headers: Vec<(String, String)>,
19 pub body: Option<String>,
20}
21
22#[derive(Debug, Clone)]
23pub struct Response {
24 pub status: u16,
25 pub headers: Vec<(String, String)>,
26 pub body: String,
27}
28
29impl Response {
30 pub fn ok(body: impl Into<String>) -> Self {
31 Self {
32 status: 200,
33 headers: vec![("Content-Type".to_string(), "text/plain".to_string())],
34 body: body.into(),
35 }
36 }
37
38 pub fn json(body: impl Into<String>) -> Self {
39 Self {
40 status: 200,
41 headers: vec![("Content-Type".to_string(), "application/json".to_string())],
42 body: body.into(),
43 }
44 }
45
46 pub fn status(code: u16) -> Self {
47 Self {
48 status: code,
49 headers: vec![],
50 body: String::new(),
51 }
52 }
53
54 pub fn not_found() -> Self {
55 Self {
56 status: 404,
57 headers: vec![],
58 body: "Not Found".to_string(),
59 }
60 }
61
62 pub fn internal_error(msg: impl Into<String>) -> Self {
63 Self {
64 status: 500,
65 headers: vec![],
66 body: msg.into(),
67 }
68 }
69
70 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
71 self.headers.push((key.into(), value.into()));
72 self
73 }
74
75 pub fn body(mut self, body: impl Into<String>) -> Self {
76 self.body = body.into();
77 self
78 }
79}
80
81pub struct Vapor {
82 pool: Arc<DynamicThreadPool>,
83 rps_tracker: Arc<RpsTracker>,
84 handler: HandlerFn,
85 port: u16,
86}
87
88impl Vapor {
89 pub fn new(handler: HandlerFn) -> Self {
90 Self {
91 pool: Arc::new(DynamicThreadPool::new()),
92 rps_tracker: Arc::new(RpsTracker::new(1000)),
93 handler,
94 port: 8080,
95 }
96 }
97
98 pub fn port(mut self, port: u16) -> Self {
99 self.port = port;
100 self
101 }
102
103 pub fn pool(mut self, pool: DynamicThreadPool) -> Self {
104 self.pool = Arc::new(pool);
105 self
106 }
107
108 pub fn rps_tracker(&self) -> &RpsTracker {
109 &self.rps_tracker
110 }
111
112 pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
113 let addr = format!("0.0.0.0:{}", self.port);
114 let listener = TcpListener::bind(&addr).await?;
115 let listener = Arc::new(listener);
116
117 info!("Vapor starting on {} with {} threads", addr, self.pool.current_threads());
118
119 let pool = self.pool.clone();
120 let rps_tracker = self.rps_tracker.clone();
121 let handler = self.handler.clone();
122
123 let sem = Arc::new(Semaphore::new(pool.current_threads()));
124 let sem_for_scale = sem.clone();
125
126 tokio::spawn(async move {
127 loop {
128 if let Some(scale_up) = pool.should_scale() {
129 let current = pool.current_threads();
130 if scale_up {
131 let new_count = (current + 1).min(pool.max_threads());
132 pool.set_thread_count(new_count);
133 let additional = new_count.saturating_sub(sem_for_scale.available_permits());
134 if additional > 0 {
135 sem_for_scale.add_permits(additional);
136 }
137 info!("Scaling up: {} -> {} threads", current, new_count);
138 } else {
139 let new_count = (current - 1).max(pool.min_threads());
140 pool.set_thread_count(new_count);
141 }
142 }
143 tokio::time::sleep(Duration::from_millis(100)).await;
144 }
145 });
146
147 let sem2 = sem.clone();
148 let listener2 = listener.clone();
149 tokio::spawn(async move {
150 loop {
151 let sem_clone = sem2.clone();
152 let permit = match sem_clone.acquire().await {
153 Ok(p) => p,
154 Err(_) => continue,
155 };
156
157 match listener2.accept().await {
158 Ok((stream, _)) => {
159 rps_tracker.record();
160 let h = handler.clone();
161 if let Err(e) = handle_connection(stream, h).await {
162 warn!("Connection error: {}", e);
163 }
164 drop(permit);
165 }
166 Err(e) => {
167 drop(permit);
168 error!("Accept error: {}", e);
169 }
170 }
171 }
172 });
173
174 use std::future::pending;
175
176 pending::<()>().await;
177 unreachable!();
178 }
179}
180
181async fn handle_connection(
182 mut stream: TcpStream,
183 handler: HandlerFn,
184) -> Result<(), Box<dyn std::error::Error>> {
185 let mut buffer = vec![0u8; 8192];
186 let n = stream.read(&mut buffer).await?;
187 if n == 0 {
188 return Ok(());
189 }
190
191 let request_str = String::from_utf8_lossy(&buffer[..n]);
192 let request = parse_request(&request_str);
193 let response = handler(request);
194
195 let response_bytes = build_response(&response);
196 stream.write_all(&response_bytes).await?;
197 stream.flush().await?;
198
199 Ok(())
200}
201
202fn parse_request(request_str: &str) -> Request {
203 let lines: Vec<&str> = request_str.lines().collect();
204 if lines.is_empty() {
205 return Request {
206 method: "GET".to_string(),
207 path: "/".to_string(),
208 headers: vec![],
209 body: None,
210 };
211 }
212
213 let first_line: Vec<&str> = lines[0].split_whitespace().collect();
214 let method = first_line.get(0).unwrap_or(&"GET").to_string();
215 let path = first_line.get(1).unwrap_or(&"/").to_string();
216
217 let mut headers = vec![];
218 let mut body_start = 0;
219
220 for (i, line) in lines.iter().enumerate().skip(1) {
221 if line.is_empty() {
222 body_start = i + 1;
223 break;
224 }
225 if let Some((key, value)) = line.split_once(':') {
226 headers.push((key.trim().to_string(), value.trim().to_string()));
227 }
228 }
229
230 let body = if body_start > 0 && body_start < lines.len() {
231 Some(lines[body_start..].join("\n"))
232 } else {
233 None
234 };
235
236 Request { method, path, headers, body }
237}
238
239fn build_response(response: &Response) -> Vec<u8> {
240 let status_text = match response.status {
241 200 => "OK",
242 404 => "Not Found",
243 500 => "Internal Server Error",
244 _ => "Unknown",
245 };
246
247 let mut response_str = format!(
248 "HTTP/1.1 {} {}\r\n",
249 response.status, status_text
250 );
251
252 for (key, value) in &response.headers {
253 response_str.push_str(&format!("{}: {}\r\n", key, value));
254 }
255
256 response_str.push_str(&format!("Content-Length: {}\r\n", response.body.len()));
257 response_str.push_str("\r\n");
258 response_str.push_str(&response.body);
259
260 response_str.into_bytes()
261}
262
263pub async fn shutdown() {
264 tokio::signal::ctrl_c().await.ok();
265}