xitca_http/h1/
dispatcher_unreal.rs

1use core::mem::MaybeUninit;
2
3use std::{io, rc::Rc};
4
5use http::StatusCode;
6use httparse::{Header, Status};
7use xitca_io::{
8    bytes::{Buf, BytesMut},
9    io::{AsyncIo, Interest},
10    net::TcpStream,
11};
12use xitca_service::Service;
13use xitca_unsafe_collection::bytes::read_buf;
14
15use crate::date::{DateTime, DateTimeHandle, DateTimeService};
16
17pub type Error = Box<dyn std::error::Error + Send + Sync>;
18
19pub struct Request<'a, C> {
20    pub method: &'a str,
21    pub path: &'a str,
22    pub headers: &'a mut [Header<'a>],
23    pub ctx: &'a C,
24}
25
26pub struct Response<'a, const STEP: usize = 1> {
27    buf: &'a mut BytesMut,
28    date: &'a DateTimeHandle,
29}
30
31impl<'a> Response<'a> {
32    pub fn status(self, status: StatusCode) -> Response<'a, 2> {
33        if status == StatusCode::OK {
34            self.buf.extend_from_slice(b"HTTP/1.1 200 OK");
35        } else {
36            self.buf.extend_from_slice(b"HTTP/1.1 ");
37            let reason = status.canonical_reason().unwrap_or("<none>").as_bytes();
38            let status = status.as_str().as_bytes();
39            self.buf.extend_from_slice(status);
40            self.buf.extend_from_slice(b" ");
41            self.buf.extend_from_slice(reason);
42        }
43
44        Response {
45            buf: self.buf,
46            date: self.date,
47        }
48    }
49}
50
51impl<'a> Response<'a, 2> {
52    pub fn header(self, key: &str, val: &str) -> Self {
53        let key = key.as_bytes();
54        let val = val.as_bytes();
55
56        self.buf.reserve(key.len() + val.len() + 4);
57        self.buf.extend_from_slice(b"\r\n");
58        self.buf.extend_from_slice(key);
59        self.buf.extend_from_slice(b": ");
60        self.buf.extend_from_slice(val);
61        self
62    }
63
64    pub fn body(self, body: &[u8]) -> Response<'a, 3> {
65        super::proto::encode::write_length_header(self.buf, body.len());
66        self.body_writer(|buf| buf.extend_from_slice(body))
67    }
68
69    pub fn body_writer<F>(mut self, func: F) -> Response<'a, 3>
70    where
71        F: for<'b> FnOnce(&'b mut BytesMut),
72    {
73        self.try_write_date();
74
75        self.buf.extend_from_slice(b"\r\n\r\n");
76
77        func(self.buf);
78
79        Response {
80            buf: self.buf,
81            date: self.date,
82        }
83    }
84
85    fn try_write_date(&mut self) {
86        self.buf.reserve(DateTimeHandle::DATE_VALUE_LENGTH + 12);
87        self.buf.extend_from_slice(b"\r\ndate: ");
88        self.date.with_date(|date| self.buf.extend_from_slice(date));
89    }
90}
91
92pub struct Dispatcher<F, C> {
93    handler: F,
94    ctx: C,
95    date: DateTimeService,
96}
97
98impl<F, C> Dispatcher<F, C> {
99    pub fn new(handler: F, ctx: C) -> Rc<Self> {
100        Rc::new(Self {
101            handler,
102            ctx,
103            date: DateTimeService::new(),
104        })
105    }
106}
107
108impl<F, C> Service<TcpStream> for Dispatcher<F, C>
109where
110    F: for<'h, 'b> AsyncFn(Request<'h, C>, Response<'h>) -> Response<'h, 3>,
111{
112    type Response = ();
113    type Error = Error;
114
115    async fn call(&self, mut stream: TcpStream) -> Result<Self::Response, Self::Error> {
116        let mut r_buf = BytesMut::with_capacity(4096);
117        let mut w_buf = BytesMut::with_capacity(4096);
118
119        let mut read_closed = false;
120
121        loop {
122            stream.ready(Interest::READABLE).await?;
123
124            loop {
125                match read_buf(&mut stream, &mut r_buf) {
126                    Ok(0) => {
127                        if core::mem::replace(&mut read_closed, true) {
128                            return Ok(());
129                        }
130                        break;
131                    }
132                    Ok(_) => {}
133                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
134                    Err(e) => return Err(e.into()),
135                }
136            }
137
138            loop {
139                let mut headers = [const { MaybeUninit::uninit() }; 16];
140
141                let mut req = httparse::Request::new(&mut []);
142
143                match req.parse_with_uninit_headers(r_buf.chunk(), &mut headers)? {
144                    Status::Complete(len) => {
145                        let req = Request {
146                            path: req.path.unwrap(),
147                            method: req.method.unwrap(),
148                            headers: req.headers,
149                            ctx: &self.ctx,
150                        };
151
152                        let res = Response {
153                            buf: &mut w_buf,
154                            date: self.date.get(),
155                        };
156
157                        (self.handler)(req, res).await;
158
159                        r_buf.advance(len);
160                    }
161                    Status::Partial => break,
162                };
163            }
164
165            let mut written = 0;
166
167            while written != w_buf.len() {
168                match io::Write::write(&mut stream, &w_buf[written..]) {
169                    Ok(0) => return Err(io::Error::from(io::ErrorKind::WriteZero).into()),
170                    Ok(n) => written += n,
171                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
172                        stream.ready(Interest::WRITABLE).await?;
173                    }
174                    Err(e) => return Err(e.into()),
175                }
176            }
177
178            w_buf.clear();
179        }
180    }
181}