Skip to main content

ntex/http/
message.rs

1use std::{cell::Ref, cell::RefCell, cell::RefMut, fmt, net, rc::Rc};
2
3use bitflags::bitflags;
4
5use crate::http::{Method, StatusCode, Uri, Version, h1::Codec, header::HeaderMap};
6use crate::io::{IoBoxed, IoRef, types};
7use crate::util::Extensions;
8
9/// Represents various types of connection
10#[derive(Copy, Clone, PartialEq, Eq, Debug)]
11pub enum ConnectionType {
12    /// Close connection after response
13    Close,
14    /// Keep connection alive after response
15    KeepAlive,
16    /// Connection is upgraded to different type
17    Upgrade,
18}
19
20bitflags! {
21    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
22    pub(crate) struct Flags: u8 {
23        const CLOSE       = 0b0000_0001;
24        const KEEP_ALIVE  = 0b0000_0010;
25        const UPGRADE     = 0b0000_0100;
26        const EXPECT      = 0b0000_1000;
27        const NO_CHUNKING = 0b0001_0000;
28    }
29}
30
31pub(crate) trait Head: Default + 'static + fmt::Debug {
32    fn clear(&mut self);
33
34    fn with_pool<F, R>(f: F) -> R
35    where
36        F: FnOnce(&MessagePool<Self>) -> R;
37}
38
39#[derive(Clone, Debug)]
40pub(crate) enum CurrentIo {
41    Ref(IoRef),
42    Io(Rc<dyn IoAccess>),
43    None,
44}
45
46pub(crate) trait IoAccess: fmt::Debug {
47    fn get(&self) -> Option<&IoRef>;
48
49    fn take(&self) -> Option<(IoBoxed, Codec)>;
50}
51
52impl CurrentIo {
53    pub(crate) fn new(io: Rc<dyn IoAccess>) -> Self {
54        CurrentIo::Io(io)
55    }
56
57    pub(crate) fn as_ref(&self) -> Option<&IoRef> {
58        match self {
59            CurrentIo::Ref(io) => Some(io),
60            CurrentIo::Io(io) => io.get(),
61            CurrentIo::None => None,
62        }
63    }
64
65    pub(crate) fn take(&self) -> Option<(IoBoxed, Codec)> {
66        match self {
67            CurrentIo::Io(io) => io.take(),
68            _ => None,
69        }
70    }
71}
72
73#[derive(Debug)]
74pub struct RequestHead {
75    pub id: usize,
76    pub uri: Uri,
77    pub method: Method,
78    pub version: Version,
79    pub headers: HeaderMap,
80    pub extensions: RefCell<Extensions>,
81    pub(crate) io: CurrentIo,
82    pub(crate) flags: Flags,
83}
84
85impl Default for RequestHead {
86    fn default() -> RequestHead {
87        RequestHead {
88            id: 0,
89            io: CurrentIo::None,
90            uri: Uri::default(),
91            method: Method::default(),
92            version: Version::HTTP_11,
93            headers: HeaderMap::with_capacity(16),
94            flags: Flags::empty(),
95            extensions: RefCell::new(Extensions::new()),
96        }
97    }
98}
99
100impl Head for RequestHead {
101    fn clear(&mut self) {
102        self.io = CurrentIo::None;
103        self.flags = Flags::empty();
104        self.version = Version::HTTP_11;
105        self.headers.clear();
106        self.extensions.get_mut().clear();
107    }
108
109    fn with_pool<F, R>(f: F) -> R
110    where
111        F: FnOnce(&MessagePool<Self>) -> R,
112    {
113        REQUEST_POOL.with(|p| f(p))
114    }
115}
116
117impl RequestHead {
118    /// Message extensions
119    #[inline]
120    pub fn extensions(&self) -> Ref<'_, Extensions> {
121        self.extensions.borrow()
122    }
123
124    /// Mutable reference to a the message's extensions
125    #[inline]
126    pub fn extensions_mut(&self) -> RefMut<'_, Extensions> {
127        self.extensions.borrow_mut()
128    }
129
130    /// Read the message headers.
131    pub fn headers(&self) -> &HeaderMap {
132        &self.headers
133    }
134
135    /// Mutable reference to the message headers.
136    pub fn headers_mut(&mut self) -> &mut HeaderMap {
137        &mut self.headers
138    }
139
140    #[inline]
141    /// Set connection type of the message
142    pub fn set_connection_type(&mut self, ctype: ConnectionType) {
143        match ctype {
144            ConnectionType::Close => self.flags.insert(Flags::CLOSE),
145            ConnectionType::KeepAlive => self.flags.insert(Flags::KEEP_ALIVE),
146            ConnectionType::Upgrade => self.flags.insert(Flags::UPGRADE),
147        }
148    }
149
150    #[inline]
151    /// Connection type
152    pub fn connection_type(&self) -> ConnectionType {
153        if self.flags.contains(Flags::CLOSE) {
154            ConnectionType::Close
155        } else if self.flags.contains(Flags::KEEP_ALIVE) {
156            ConnectionType::KeepAlive
157        } else if self.flags.contains(Flags::UPGRADE) {
158            ConnectionType::Upgrade
159        } else if self.version < Version::HTTP_11 {
160            ConnectionType::Close
161        } else {
162            ConnectionType::KeepAlive
163        }
164    }
165
166    #[inline]
167    /// Connection upgrade status
168    pub fn upgrade(&self) -> bool {
169        self.flags.contains(Flags::UPGRADE)
170    }
171
172    #[inline]
173    /// Request contains `EXPECT` header
174    pub fn expect(&self) -> bool {
175        self.flags.contains(Flags::EXPECT)
176    }
177
178    #[inline]
179    /// Get response body chunking state
180    pub fn chunked(&self) -> bool {
181        !self.flags.contains(Flags::NO_CHUNKING)
182    }
183
184    #[inline]
185    pub fn no_chunking(&mut self, val: bool) {
186        if val {
187            self.flags.insert(Flags::NO_CHUNKING);
188        } else {
189            self.flags.remove(Flags::NO_CHUNKING);
190        }
191    }
192
193    #[inline]
194    pub(crate) fn set_expect(&mut self) {
195        self.flags.insert(Flags::EXPECT);
196    }
197
198    #[inline]
199    pub(crate) fn set_upgrade(&mut self) {
200        self.flags.insert(Flags::UPGRADE);
201    }
202
203    /// Peer socket address
204    ///
205    /// Peer address is actual socket address, if proxy is used in front of
206    /// ntex http server, then peer address would be address of this proxy.
207    #[inline]
208    pub fn peer_addr(&self) -> Option<net::SocketAddr> {
209        self.io.as_ref().and_then(|io| {
210            io.query::<types::PeerAddr>()
211                .get()
212                .map(types::PeerAddr::into_inner)
213        })
214    }
215
216    /// Take io and codec for current request
217    ///
218    /// This objects are set only for upgrade requests
219    pub fn take_io(&self) -> Option<(IoBoxed, Codec)> {
220        self.io.take()
221    }
222
223    #[doc(hidden)]
224    pub fn remove_io(&mut self) {
225        self.io = CurrentIo::None;
226    }
227}
228
229#[derive(Debug)]
230pub struct ResponseHead {
231    pub version: Version,
232    pub status: StatusCode,
233    pub headers: HeaderMap,
234    pub reason: Option<&'static str>,
235    pub(crate) io: CurrentIo,
236    pub(crate) extensions: RefCell<Extensions>,
237    flags: Flags,
238}
239
240impl ResponseHead {
241    /// Create new instance of `ResponseHead` type
242    #[inline]
243    pub fn new(status: StatusCode) -> ResponseHead {
244        ResponseHead {
245            status,
246            version: Version::default(),
247            headers: HeaderMap::with_capacity(12),
248            reason: None,
249            flags: Flags::empty(),
250            io: CurrentIo::None,
251            extensions: RefCell::new(Extensions::new()),
252        }
253    }
254
255    /// Message extensions
256    #[inline]
257    pub fn extensions(&self) -> Ref<'_, Extensions> {
258        self.extensions.borrow()
259    }
260
261    /// Mutable reference to a the message's extensions
262    #[inline]
263    pub fn extensions_mut(&self) -> RefMut<'_, Extensions> {
264        self.extensions.borrow_mut()
265    }
266
267    #[inline]
268    /// Read the message headers.
269    pub fn headers(&self) -> &HeaderMap {
270        &self.headers
271    }
272
273    #[inline]
274    /// Mutable reference to the message headers.
275    pub fn headers_mut(&mut self) -> &mut HeaderMap {
276        &mut self.headers
277    }
278
279    #[inline]
280    /// Set connection type of the message
281    pub fn set_connection_type(&mut self, ctype: ConnectionType) {
282        match ctype {
283            ConnectionType::Close => self.flags.insert(Flags::CLOSE),
284            ConnectionType::KeepAlive => self.flags.insert(Flags::KEEP_ALIVE),
285            ConnectionType::Upgrade => self.flags.insert(Flags::UPGRADE),
286        }
287    }
288
289    #[inline]
290    pub fn connection_type(&self) -> ConnectionType {
291        if self.flags.contains(Flags::CLOSE) {
292            ConnectionType::Close
293        } else if self.flags.contains(Flags::KEEP_ALIVE) {
294            ConnectionType::KeepAlive
295        } else if self.flags.contains(Flags::UPGRADE) {
296            ConnectionType::Upgrade
297        } else if self.version < Version::HTTP_11 {
298            ConnectionType::Close
299        } else {
300            ConnectionType::KeepAlive
301        }
302    }
303
304    #[inline]
305    /// Check if keep-alive is enabled
306    pub fn keep_alive(&self) -> bool {
307        self.connection_type() == ConnectionType::KeepAlive
308    }
309
310    #[inline]
311    /// Check upgrade status of this message
312    pub fn upgrade(&self) -> bool {
313        self.connection_type() == ConnectionType::Upgrade
314    }
315
316    /// Get custom reason for the response
317    #[inline]
318    pub fn reason(&self) -> &str {
319        if let Some(reason) = self.reason {
320            reason
321        } else {
322            self.status
323                .canonical_reason()
324                .unwrap_or("<unknown status code>")
325        }
326    }
327
328    #[inline]
329    pub(crate) fn ctype(&self) -> Option<ConnectionType> {
330        if self.flags.contains(Flags::CLOSE) {
331            Some(ConnectionType::Close)
332        } else if self.flags.contains(Flags::KEEP_ALIVE) {
333            Some(ConnectionType::KeepAlive)
334        } else if self.flags.contains(Flags::UPGRADE) {
335            Some(ConnectionType::Upgrade)
336        } else {
337            None
338        }
339    }
340
341    #[inline]
342    /// Get response body chunking state
343    pub fn chunked(&self) -> bool {
344        !self.flags.contains(Flags::NO_CHUNKING)
345    }
346
347    #[inline]
348    /// Set no chunking for payload
349    pub fn no_chunking(&mut self, val: bool) {
350        if val {
351            self.flags.insert(Flags::NO_CHUNKING);
352        } else {
353            self.flags.remove(Flags::NO_CHUNKING);
354        }
355    }
356}
357
358impl Default for ResponseHead {
359    fn default() -> Self {
360        Self::new(StatusCode::default())
361    }
362}
363
364impl Head for ResponseHead {
365    fn clear(&mut self) {
366        self.reason = None;
367        self.headers.clear();
368        self.io = CurrentIo::None;
369        self.flags = Flags::empty();
370    }
371
372    fn with_pool<F, R>(f: F) -> R
373    where
374        F: FnOnce(&MessagePool<Self>) -> R,
375    {
376        RESPONSE_POOL.with(|p| f(p))
377    }
378}
379
380#[derive(Debug)]
381pub(crate) struct Message<T: Head> {
382    head: Rc<T>,
383}
384
385impl<T: Head> Message<T> {
386    /// Get new message from the pool of objects
387    pub(crate) fn new() -> Self {
388        T::with_pool(MessagePool::get_message)
389    }
390}
391
392impl Message<ResponseHead> {
393    /// Get new message from the pool of objects
394    pub(crate) fn with_status(status: StatusCode) -> Self {
395        let mut msg = RESPONSE_POOL.with(MessagePool::get_message);
396        msg.status = status;
397        msg
398    }
399}
400
401impl<T: Head> Clone for Message<T> {
402    fn clone(&self) -> Self {
403        Self {
404            head: self.head.clone(),
405        }
406    }
407}
408
409impl<T: Head> std::ops::Deref for Message<T> {
410    type Target = T;
411
412    fn deref(&self) -> &Self::Target {
413        self.head.as_ref()
414    }
415}
416
417impl<T: Head> std::ops::DerefMut for Message<T> {
418    fn deref_mut(&mut self) -> &mut Self::Target {
419        Rc::get_mut(&mut self.head).expect("Multiple copies exist")
420    }
421}
422
423impl<T: Head> Drop for Message<T> {
424    fn drop(&mut self) {
425        if Rc::strong_count(&self.head) == 1 {
426            T::with_pool(|pool| {
427                let v = &mut pool.0.borrow_mut();
428                if v.len() < 128 {
429                    Rc::get_mut(&mut self.head)
430                        .expect("Multiple copies exist")
431                        .clear();
432                    v.push(self.head.clone());
433                }
434            });
435        }
436    }
437}
438
439/// Request's objects pool
440pub(crate) struct MessagePool<T: Head>(RefCell<Vec<Rc<T>>>);
441
442thread_local!(static REQUEST_POOL: MessagePool<RequestHead> = MessagePool::<RequestHead>::new());
443thread_local!(static RESPONSE_POOL: MessagePool<ResponseHead> = MessagePool::<ResponseHead>::new());
444
445impl<T: Head> MessagePool<T> {
446    fn new() -> MessagePool<T> {
447        MessagePool(RefCell::new(Vec::with_capacity(256)))
448    }
449
450    /// Get message from the pool
451    #[inline]
452    fn get_message(&self) -> Message<T> {
453        let head = if let Some(mut msg) = self.0.borrow_mut().pop() {
454            if let Some(msg) = Rc::get_mut(&mut msg) {
455                msg.clear();
456            }
457            msg
458        } else {
459            Rc::new(T::default())
460        };
461        Message { head }
462    }
463}