1use std::{cell::Ref, cell::RefCell, cell::RefMut, fmt, net, rc::Rc};
2
3use bitflags::bitflags;
4
5use crate::http::{Method, StatusCode, Uri, Version, h1::Codec, header::HeaderMap};
6use crate::io::{IoBoxed, IoRef, types};
7use crate::util::Extensions;
8
9#[derive(Copy, Clone, PartialEq, Eq, Debug)]
11pub enum ConnectionType {
12 Close,
14 KeepAlive,
16 Upgrade,
18}
19
20bitflags! {
21 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
22 pub(crate) struct Flags: u8 {
23 const CLOSE = 0b0000_0001;
24 const KEEP_ALIVE = 0b0000_0010;
25 const UPGRADE = 0b0000_0100;
26 const EXPECT = 0b0000_1000;
27 const NO_CHUNKING = 0b0001_0000;
28 }
29}
30
31pub(crate) trait Head: Default + 'static + fmt::Debug {
32 fn clear(&mut self);
33
34 fn with_pool<F, R>(f: F) -> R
35 where
36 F: FnOnce(&MessagePool<Self>) -> R;
37}
38
39#[derive(Clone, Debug)]
40pub(crate) enum CurrentIo {
41 Ref(IoRef),
42 Io(Rc<dyn IoAccess>),
43 None,
44}
45
46pub(crate) trait IoAccess: fmt::Debug {
47 fn get(&self) -> Option<&IoRef>;
48
49 fn take(&self) -> Option<(IoBoxed, Codec)>;
50}
51
52impl CurrentIo {
53 pub(crate) fn new(io: Rc<dyn IoAccess>) -> Self {
54 CurrentIo::Io(io)
55 }
56
57 pub(crate) fn as_ref(&self) -> Option<&IoRef> {
58 match self {
59 CurrentIo::Ref(io) => Some(io),
60 CurrentIo::Io(io) => io.get(),
61 CurrentIo::None => None,
62 }
63 }
64
65 pub(crate) fn take(&self) -> Option<(IoBoxed, Codec)> {
66 match self {
67 CurrentIo::Io(io) => io.take(),
68 _ => None,
69 }
70 }
71}
72
73#[derive(Debug)]
74pub struct RequestHead {
75 pub id: usize,
76 pub uri: Uri,
77 pub method: Method,
78 pub version: Version,
79 pub headers: HeaderMap,
80 pub extensions: RefCell<Extensions>,
81 pub(crate) io: CurrentIo,
82 pub(crate) flags: Flags,
83}
84
85impl Default for RequestHead {
86 fn default() -> RequestHead {
87 RequestHead {
88 id: 0,
89 io: CurrentIo::None,
90 uri: Uri::default(),
91 method: Method::default(),
92 version: Version::HTTP_11,
93 headers: HeaderMap::with_capacity(16),
94 flags: Flags::empty(),
95 extensions: RefCell::new(Extensions::new()),
96 }
97 }
98}
99
100impl Head for RequestHead {
101 fn clear(&mut self) {
102 self.io = CurrentIo::None;
103 self.flags = Flags::empty();
104 self.version = Version::HTTP_11;
105 self.headers.clear();
106 self.extensions.get_mut().clear();
107 }
108
109 fn with_pool<F, R>(f: F) -> R
110 where
111 F: FnOnce(&MessagePool<Self>) -> R,
112 {
113 REQUEST_POOL.with(|p| f(p))
114 }
115}
116
117impl RequestHead {
118 #[inline]
120 pub fn extensions(&self) -> Ref<'_, Extensions> {
121 self.extensions.borrow()
122 }
123
124 #[inline]
126 pub fn extensions_mut(&self) -> RefMut<'_, Extensions> {
127 self.extensions.borrow_mut()
128 }
129
130 pub fn headers(&self) -> &HeaderMap {
132 &self.headers
133 }
134
135 pub fn headers_mut(&mut self) -> &mut HeaderMap {
137 &mut self.headers
138 }
139
140 #[inline]
141 pub fn set_connection_type(&mut self, ctype: ConnectionType) {
143 match ctype {
144 ConnectionType::Close => self.flags.insert(Flags::CLOSE),
145 ConnectionType::KeepAlive => self.flags.insert(Flags::KEEP_ALIVE),
146 ConnectionType::Upgrade => self.flags.insert(Flags::UPGRADE),
147 }
148 }
149
150 #[inline]
151 pub fn connection_type(&self) -> ConnectionType {
153 if self.flags.contains(Flags::CLOSE) {
154 ConnectionType::Close
155 } else if self.flags.contains(Flags::KEEP_ALIVE) {
156 ConnectionType::KeepAlive
157 } else if self.flags.contains(Flags::UPGRADE) {
158 ConnectionType::Upgrade
159 } else if self.version < Version::HTTP_11 {
160 ConnectionType::Close
161 } else {
162 ConnectionType::KeepAlive
163 }
164 }
165
166 #[inline]
167 pub fn upgrade(&self) -> bool {
169 self.flags.contains(Flags::UPGRADE)
170 }
171
172 #[inline]
173 pub fn expect(&self) -> bool {
175 self.flags.contains(Flags::EXPECT)
176 }
177
178 #[inline]
179 pub fn chunked(&self) -> bool {
181 !self.flags.contains(Flags::NO_CHUNKING)
182 }
183
184 #[inline]
185 pub fn no_chunking(&mut self, val: bool) {
186 if val {
187 self.flags.insert(Flags::NO_CHUNKING);
188 } else {
189 self.flags.remove(Flags::NO_CHUNKING);
190 }
191 }
192
193 #[inline]
194 pub(crate) fn set_expect(&mut self) {
195 self.flags.insert(Flags::EXPECT);
196 }
197
198 #[inline]
199 pub(crate) fn set_upgrade(&mut self) {
200 self.flags.insert(Flags::UPGRADE);
201 }
202
203 #[inline]
208 pub fn peer_addr(&self) -> Option<net::SocketAddr> {
209 self.io.as_ref().and_then(|io| {
210 io.query::<types::PeerAddr>()
211 .get()
212 .map(types::PeerAddr::into_inner)
213 })
214 }
215
216 pub fn take_io(&self) -> Option<(IoBoxed, Codec)> {
220 self.io.take()
221 }
222
223 #[doc(hidden)]
224 pub fn remove_io(&mut self) {
225 self.io = CurrentIo::None;
226 }
227}
228
229#[derive(Debug)]
230pub struct ResponseHead {
231 pub version: Version,
232 pub status: StatusCode,
233 pub headers: HeaderMap,
234 pub reason: Option<&'static str>,
235 pub(crate) io: CurrentIo,
236 pub(crate) extensions: RefCell<Extensions>,
237 flags: Flags,
238}
239
240impl ResponseHead {
241 #[inline]
243 pub fn new(status: StatusCode) -> ResponseHead {
244 ResponseHead {
245 status,
246 version: Version::default(),
247 headers: HeaderMap::with_capacity(12),
248 reason: None,
249 flags: Flags::empty(),
250 io: CurrentIo::None,
251 extensions: RefCell::new(Extensions::new()),
252 }
253 }
254
255 #[inline]
257 pub fn extensions(&self) -> Ref<'_, Extensions> {
258 self.extensions.borrow()
259 }
260
261 #[inline]
263 pub fn extensions_mut(&self) -> RefMut<'_, Extensions> {
264 self.extensions.borrow_mut()
265 }
266
267 #[inline]
268 pub fn headers(&self) -> &HeaderMap {
270 &self.headers
271 }
272
273 #[inline]
274 pub fn headers_mut(&mut self) -> &mut HeaderMap {
276 &mut self.headers
277 }
278
279 #[inline]
280 pub fn set_connection_type(&mut self, ctype: ConnectionType) {
282 match ctype {
283 ConnectionType::Close => self.flags.insert(Flags::CLOSE),
284 ConnectionType::KeepAlive => self.flags.insert(Flags::KEEP_ALIVE),
285 ConnectionType::Upgrade => self.flags.insert(Flags::UPGRADE),
286 }
287 }
288
289 #[inline]
290 pub fn connection_type(&self) -> ConnectionType {
291 if self.flags.contains(Flags::CLOSE) {
292 ConnectionType::Close
293 } else if self.flags.contains(Flags::KEEP_ALIVE) {
294 ConnectionType::KeepAlive
295 } else if self.flags.contains(Flags::UPGRADE) {
296 ConnectionType::Upgrade
297 } else if self.version < Version::HTTP_11 {
298 ConnectionType::Close
299 } else {
300 ConnectionType::KeepAlive
301 }
302 }
303
304 #[inline]
305 pub fn keep_alive(&self) -> bool {
307 self.connection_type() == ConnectionType::KeepAlive
308 }
309
310 #[inline]
311 pub fn upgrade(&self) -> bool {
313 self.connection_type() == ConnectionType::Upgrade
314 }
315
316 #[inline]
318 pub fn reason(&self) -> &str {
319 if let Some(reason) = self.reason {
320 reason
321 } else {
322 self.status
323 .canonical_reason()
324 .unwrap_or("<unknown status code>")
325 }
326 }
327
328 #[inline]
329 pub(crate) fn ctype(&self) -> Option<ConnectionType> {
330 if self.flags.contains(Flags::CLOSE) {
331 Some(ConnectionType::Close)
332 } else if self.flags.contains(Flags::KEEP_ALIVE) {
333 Some(ConnectionType::KeepAlive)
334 } else if self.flags.contains(Flags::UPGRADE) {
335 Some(ConnectionType::Upgrade)
336 } else {
337 None
338 }
339 }
340
341 #[inline]
342 pub fn chunked(&self) -> bool {
344 !self.flags.contains(Flags::NO_CHUNKING)
345 }
346
347 #[inline]
348 pub fn no_chunking(&mut self, val: bool) {
350 if val {
351 self.flags.insert(Flags::NO_CHUNKING);
352 } else {
353 self.flags.remove(Flags::NO_CHUNKING);
354 }
355 }
356}
357
358impl Default for ResponseHead {
359 fn default() -> Self {
360 Self::new(StatusCode::default())
361 }
362}
363
364impl Head for ResponseHead {
365 fn clear(&mut self) {
366 self.reason = None;
367 self.headers.clear();
368 self.io = CurrentIo::None;
369 self.flags = Flags::empty();
370 }
371
372 fn with_pool<F, R>(f: F) -> R
373 where
374 F: FnOnce(&MessagePool<Self>) -> R,
375 {
376 RESPONSE_POOL.with(|p| f(p))
377 }
378}
379
380#[derive(Debug)]
381pub(crate) struct Message<T: Head> {
382 head: Rc<T>,
383}
384
385impl<T: Head> Message<T> {
386 pub(crate) fn new() -> Self {
388 T::with_pool(MessagePool::get_message)
389 }
390}
391
392impl Message<ResponseHead> {
393 pub(crate) fn with_status(status: StatusCode) -> Self {
395 let mut msg = RESPONSE_POOL.with(MessagePool::get_message);
396 msg.status = status;
397 msg
398 }
399}
400
401impl<T: Head> Clone for Message<T> {
402 fn clone(&self) -> Self {
403 Self {
404 head: self.head.clone(),
405 }
406 }
407}
408
409impl<T: Head> std::ops::Deref for Message<T> {
410 type Target = T;
411
412 fn deref(&self) -> &Self::Target {
413 self.head.as_ref()
414 }
415}
416
417impl<T: Head> std::ops::DerefMut for Message<T> {
418 fn deref_mut(&mut self) -> &mut Self::Target {
419 Rc::get_mut(&mut self.head).expect("Multiple copies exist")
420 }
421}
422
423impl<T: Head> Drop for Message<T> {
424 fn drop(&mut self) {
425 if Rc::strong_count(&self.head) == 1 {
426 T::with_pool(|pool| {
427 let v = &mut pool.0.borrow_mut();
428 if v.len() < 128 {
429 Rc::get_mut(&mut self.head)
430 .expect("Multiple copies exist")
431 .clear();
432 v.push(self.head.clone());
433 }
434 });
435 }
436 }
437}
438
439pub(crate) struct MessagePool<T: Head>(RefCell<Vec<Rc<T>>>);
441
442thread_local!(static REQUEST_POOL: MessagePool<RequestHead> = MessagePool::<RequestHead>::new());
443thread_local!(static RESPONSE_POOL: MessagePool<ResponseHead> = MessagePool::<ResponseHead>::new());
444
445impl<T: Head> MessagePool<T> {
446 fn new() -> MessagePool<T> {
447 MessagePool(RefCell::new(Vec::with_capacity(256)))
448 }
449
450 #[inline]
452 fn get_message(&self) -> Message<T> {
453 let head = if let Some(mut msg) = self.0.borrow_mut().pop() {
454 if let Some(msg) = Rc::get_mut(&mut msg) {
455 msg.clear();
456 }
457 msg
458 } else {
459 Rc::new(T::default())
460 };
461 Message { head }
462 }
463}