xitca_http/h1/
dispatcher_unreal.rs1use core::mem::MaybeUninit;
2
3use std::{io, rc::Rc};
4
5use http::StatusCode;
6use httparse::{Header, Status};
7use xitca_io::{
8 bytes::{Buf, BytesMut},
9 io::{AsyncIo, Interest},
10 net::TcpStream,
11};
12use xitca_service::Service;
13use xitca_unsafe_collection::bytes::read_buf;
14
15use crate::date::{DateTime, DateTimeHandle, DateTimeService};
16
17pub type Error = Box<dyn std::error::Error + Send + Sync>;
18
19pub struct Request<'a, C> {
20 pub method: &'a str,
21 pub path: &'a str,
22 pub headers: &'a mut [Header<'a>],
23 pub ctx: &'a C,
24}
25
26pub struct Response<'a, const STEP: usize = 1> {
27 buf: &'a mut BytesMut,
28 date: &'a DateTimeHandle,
29}
30
31impl<'a> Response<'a> {
32 pub fn status(self, status: StatusCode) -> Response<'a, 2> {
33 if status == StatusCode::OK {
34 self.buf.extend_from_slice(b"HTTP/1.1 200 OK");
35 } else {
36 self.buf.extend_from_slice(b"HTTP/1.1 ");
37 let reason = status.canonical_reason().unwrap_or("<none>").as_bytes();
38 let status = status.as_str().as_bytes();
39 self.buf.extend_from_slice(status);
40 self.buf.extend_from_slice(b" ");
41 self.buf.extend_from_slice(reason);
42 }
43
44 Response {
45 buf: self.buf,
46 date: self.date,
47 }
48 }
49}
50
51impl<'a> Response<'a, 2> {
52 pub fn header(self, key: &str, val: &str) -> Self {
53 let key = key.as_bytes();
54 let val = val.as_bytes();
55
56 self.buf.reserve(key.len() + val.len() + 4);
57 self.buf.extend_from_slice(b"\r\n");
58 self.buf.extend_from_slice(key);
59 self.buf.extend_from_slice(b": ");
60 self.buf.extend_from_slice(val);
61 self
62 }
63
64 pub fn body(self, body: &[u8]) -> Response<'a, 3> {
65 super::proto::encode::write_length_header(self.buf, body.len());
66 self.body_writer(|buf| buf.extend_from_slice(body))
67 }
68
69 pub fn body_writer<F>(mut self, func: F) -> Response<'a, 3>
70 where
71 F: for<'b> FnOnce(&'b mut BytesMut),
72 {
73 self.try_write_date();
74
75 self.buf.extend_from_slice(b"\r\n\r\n");
76
77 func(self.buf);
78
79 Response {
80 buf: self.buf,
81 date: self.date,
82 }
83 }
84
85 fn try_write_date(&mut self) {
86 self.buf.reserve(DateTimeHandle::DATE_VALUE_LENGTH + 12);
87 self.buf.extend_from_slice(b"\r\ndate: ");
88 self.date.with_date(|date| self.buf.extend_from_slice(date));
89 }
90}
91
92pub struct Dispatcher<F, C> {
93 handler: F,
94 ctx: C,
95 date: DateTimeService,
96}
97
98impl<F, C> Dispatcher<F, C> {
99 pub fn new(handler: F, ctx: C) -> Rc<Self> {
100 Rc::new(Self {
101 handler,
102 ctx,
103 date: DateTimeService::new(),
104 })
105 }
106}
107
108impl<F, C> Service<TcpStream> for Dispatcher<F, C>
109where
110 F: for<'h, 'b> AsyncFn(Request<'h, C>, Response<'h>) -> Response<'h, 3>,
111{
112 type Response = ();
113 type Error = Error;
114
115 async fn call(&self, mut stream: TcpStream) -> Result<Self::Response, Self::Error> {
116 let mut r_buf = BytesMut::with_capacity(4096);
117 let mut w_buf = BytesMut::with_capacity(4096);
118
119 let mut read_closed = false;
120
121 loop {
122 stream.ready(Interest::READABLE).await?;
123
124 loop {
125 match read_buf(&mut stream, &mut r_buf) {
126 Ok(0) => {
127 if core::mem::replace(&mut read_closed, true) {
128 return Ok(());
129 }
130 break;
131 }
132 Ok(_) => {}
133 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
134 Err(e) => return Err(e.into()),
135 }
136 }
137
138 loop {
139 let mut headers = [const { MaybeUninit::uninit() }; 16];
140
141 let mut req = httparse::Request::new(&mut []);
142
143 match req.parse_with_uninit_headers(r_buf.chunk(), &mut headers)? {
144 Status::Complete(len) => {
145 let req = Request {
146 path: req.path.unwrap(),
147 method: req.method.unwrap(),
148 headers: req.headers,
149 ctx: &self.ctx,
150 };
151
152 let res = Response {
153 buf: &mut w_buf,
154 date: self.date.get(),
155 };
156
157 (self.handler)(req, res).await;
158
159 r_buf.advance(len);
160 }
161 Status::Partial => break,
162 };
163 }
164
165 let mut written = 0;
166
167 while written != w_buf.len() {
168 match io::Write::write(&mut stream, &w_buf[written..]) {
169 Ok(0) => return Err(io::Error::from(io::ErrorKind::WriteZero).into()),
170 Ok(n) => written += n,
171 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
172 stream.ready(Interest::WRITABLE).await?;
173 }
174 Err(e) => return Err(e.into()),
175 }
176 }
177
178 w_buf.clear();
179 }
180 }
181}