1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
use bytes::BufMut;
use bytes::BytesMut;
use mio::event::Event;
use crate::handler::Handler;
use crate::http::{self, ParseStatus};
use mio::net::TcpStream;
use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
const MAX_REQUEST_SIZE: usize = 8192;
#[derive(PartialEq)]
pub enum ConnectionState {
Reading,
Writing,
Closed,
}
pub struct Connection {
pub read_buffer: BytesMut,
pub write_buffer: BytesMut,
state: ConnectionState,
socket: TcpStream,
}
impl Connection {
pub fn new(socket: TcpStream, read_buffer: BytesMut, write_buffer: BytesMut) -> Self {
Connection {
state: ConnectionState::Reading,
read_buffer,
write_buffer,
socket,
}
}
/// The state machine coordinator.
/// Dispatches I/O tasks based on current state and event.
pub fn process<H: Handler>(&mut self, event: &Event, handler: &H) {
// Handle the read phase
// We only read if the socket is ready AND we are expecting a request.
if event.is_readable() && self.state == ConnectionState::Reading {
match self.read() {
Ok(0) => {
// Clean close: peer shut down the connection
self.state = ConnectionState::Closed;
return;
}
Ok(n) if n > 0 => {
// Data arrived, Try to parse it into an HTTP Request.
self.handle_request(handler);
}
Ok(_) => (), // WouldBlock received, so no more data to read for now
Err(_) => {
// Fatal socket error
self.state = ConnectionState::Closed;
return;
}
}
}
// Handles the write phase
if self.state == ConnectionState::Writing {
match self.write() {
Ok(_) => {
// write() will set state to Closed once the buffer is fully drained.
// If it was a partial write, state remains Writing.
}
Err(_) => {
self.state = ConnectionState::Closed;
}
}
}
}
/// Reads data from the socket directly into the pooled BytesMut.
/// Returns the number of bytes read in this call.
pub fn read(&mut self) -> std::io::Result<usize> {
let mut bytes_read_this_turn = 0;
loop {
// Hard limit check
// Prevents malicious clients from causing Out-Of-Memory (OOM) via Slowloris.
if self.read_buffer.len() >= MAX_REQUEST_SIZE {
self.state = ConnectionState::Closed;
return Err(std::io::Error::new(
ErrorKind::InvalidData,
"Request header too large",
));
}
// Ensure there is space to read.
if self.read_buffer.remaining_mut() < 1024 {
let space_left = MAX_REQUEST_SIZE - self.read_buffer.len();
// If we can't even fit 1 more byte, stop reading.
if space_left == 0 {
break;
}
// Reserve a chunk, but never more than our Hard Limit.
self.read_buffer.reserve(std::cmp::min(1024, space_left));
}
let n = unsafe {
// Get raw pointer to the start of uninitialized capacity
let ptr = self.read_buffer.as_mut_ptr().add(self.read_buffer.len());
let cap = self.read_buffer.capacity() - self.read_buffer.len();
// Create a slice that points directly to our Pool Memory
let slice = std::slice::from_raw_parts_mut(ptr, cap);
match self.socket.read(slice) {
Ok(0) => {
// Clean EOF: Client closed the connection
self.state = ConnectionState::Closed;
return Ok(bytes_read_this_turn);
}
Ok(n) => n,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
// No more data in the Kernel's buffer for this event
return Ok(bytes_read_this_turn);
}
Err(e) => {
self.state = ConnectionState::Closed;
return Err(e);
}
}
};
// Commit the bytes, by updating the new end of the read_buffer
unsafe {
let new_len = self.read_buffer.len() + n;
self.read_buffer.set_len(new_len);
}
bytes_read_this_turn += n;
// Loop continues to "drain" the socket until it would block.
}
Ok(bytes_read_this_turn)
}
/// Writes the contents of write_buffer to the socket.
/// Returns the number of bytes written in this call.
pub fn write(&mut self) -> std::io::Result<usize> {
let mut bytes_written_this_turn = 0;
// Drain the buffer
while !self.write_buffer.is_empty() {
match self.socket.write(&self.write_buffer) {
Ok(0) => {
// A write of 0 usually means the connection was dropped by the peer
self.state = ConnectionState::Closed;
return Ok(bytes_written_this_turn);
}
Ok(n) => {
// Zero-copy
let _ = self.write_buffer.split_to(n);
bytes_written_this_turn += n;
}
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
// Backpressure handling: the kernel send buffer is full, so stop writing and
// return, staying in the Writing state so the next event triggers this method
// again
return Ok(bytes_written_this_turn);
}
Err(e) => {
// Fatal socket error
self.state = ConnectionState::Closed;
return Err(e);
}
}
}
// If the loop finishes, the buffer is empty.
// TODO: when HTTP Keep-Alive is implemented, switch back to Reading instead.
self.state = ConnectionState::Closed;
Ok(bytes_written_this_turn)
}
/// Attempts to parse and process an HTTP request from the internal read buffer.
///
/// This method implements a zero-copy approach:
/// 1. It uses `httparse` to find the boundaries of the request without copying data.
/// 2. If a full request is found, it "consumes" those bytes from the `read_buffer`
/// using `split_to`, which is an O(1) operation that simply moves a pointer.
/// 3. It then generates a response and transitions the connection state to `Writing`.
///
/// # Pipelining Support
/// If the `read_buffer` contains more than one request (Pipelining), this method
/// only consumes the first request. The remaining bytes stay in the buffer to
/// be processed in the next cycle.
pub fn handle_request<H: Handler>(&mut self, handler:&H) {
// Storage for headers (httparse needs a place to put references)
let mut header_storage = [httparse::EMPTY_HEADER; 64];
// Attempt to parse the read_buffer
match http::parse_request(&self.read_buffer, &mut header_storage) {
ParseStatus::Complete(req, amt) => {
// full HTTP Request
// For now, ignore the request and just generate a hardcoded response
let resp = handler.handle(req);
// Encode the response into the write_buffer
resp.encode(&mut self.write_buffer);
// Consume the parsed bytes from the read_buffer
// If there's extra data (like a second request), it stays in read_buffer
let _ = self.read_buffer.split_to(amt);
// Transition state to start sending the data
self.state = ConnectionState::Writing;
}
ParseStatus::Partial => {
// The HTTP request wasn't fully read, so stay in the Reading state wating for the
// next event
}
ParseStatus::Error(_) => {
// Malformed HTTP, close the connection
self.state = ConnectionState::Closed;
}
}
}
pub fn socket(&mut self) -> &mut TcpStream {
&mut self.socket
}
pub fn state(&self) -> &ConnectionState {
&self.state
}
pub fn get_buffers(self) -> (BytesMut, BytesMut) {
(self.read_buffer, self.write_buffer)
}
}