1pub mod error;
8
9use crate::error::HyperError;
10use embedded_svc::http::client::Connection;
11use embedded_svc::http::{Headers, Method, Status};
12use embedded_svc::io::{ErrorType, Read, Write};
13use http_body_util::{BodyExt, Full};
14use hyper::body::{Bytes, Incoming};
15use hyper::header::{HeaderName, HeaderValue};
16use hyper::{HeaderMap, Request, Response};
17use hyper_tls::HttpsConnector;
18use hyper_util::client::legacy::Client;
19use hyper_util::client::legacy::connect::HttpConnector;
20use hyper_util::rt::TokioExecutor;
21use tokio::runtime::Runtime;
22
23const DEFAULT_BUFFER_SIZE: usize = 8192;
25
26type HyperClient = Client<HttpsConnector<HttpConnector>, Full<Bytes>>;
28
29pub struct HyperHttpConnection {
47 rt: Runtime,
48 client: HyperClient,
49 request: Option<Request<Full<Bytes>>>,
50 response: Option<Response<Incoming>>,
51 read_buffer: Bytes,
52 write_buffer: Vec<u8>,
53}
54
55impl HyperHttpConnection {
56 pub fn new() -> Result<Self, HyperError> {
62 let https = HttpsConnector::new();
63 let client = Client::builder(TokioExecutor::new()).build(https);
64 let rt = Runtime::new().map_err(HyperError::RuntimeCreation)?;
65
66 Ok(Self {
67 rt,
68 client,
69 request: None,
70 response: None,
71 read_buffer: Bytes::new(),
72 write_buffer: Vec::with_capacity(DEFAULT_BUFFER_SIZE),
73 })
74 }
75
76 fn map_method(method: Method) -> Result<hyper::Method, HyperError> {
80 match method {
81 Method::Delete => Ok(hyper::Method::DELETE),
82 Method::Get => Ok(hyper::Method::GET),
83 Method::Head => Ok(hyper::Method::HEAD),
84 Method::Post => Ok(hyper::Method::POST),
85 Method::Put => Ok(hyper::Method::PUT),
86 Method::Connect => Ok(hyper::Method::CONNECT),
87 Method::Options => Ok(hyper::Method::OPTIONS),
88 Method::Trace => Ok(hyper::Method::TRACE),
89 Method::Patch => Ok(hyper::Method::PATCH),
90 _ => Err(HyperError::UnsupportedMethod(format!("{method:?}"))),
91 }
92 }
93
94 fn build_headers(headers: &[(&str, &str)]) -> Result<HeaderMap, HyperError> {
99 let mut header_map = HeaderMap::with_capacity(headers.len());
100
101 for &(name, value) in headers {
102 let header_name =
103 HeaderName::from_bytes(name.as_bytes()).map_err(HyperError::InvalidHeaderName)?;
104 let header_value =
105 HeaderValue::from_str(value).map_err(HyperError::InvalidHeaderValue)?;
106 header_map.insert(header_name, header_value);
107 }
108
109 Ok(header_map)
110 }
111
112 fn ensure_response(&self) -> Result<&Response<Incoming>, HyperError> {
116 self.response.as_ref().ok_or(HyperError::NoResponse)
117 }
118
119 fn load_response_body(&mut self) -> Result<(), HyperError> {
124 if let Some(mut response) = self.response.take() {
125 let body_future = response.body_mut().collect();
126 let body = self.rt.block_on(body_future).map_err(HyperError::Hyper)?;
127 self.read_buffer = body.to_bytes();
128 }
129 Ok(())
130 }
131}
132
133impl Default for HyperHttpConnection {
134 fn default() -> Self {
138 Self::new().expect("Failed to create HyperHttpConnection")
139 }
140}
141
142impl ErrorType for HyperHttpConnection {
143 type Error = HyperError;
145}
146
147impl Status for HyperHttpConnection {
148 fn status(&self) -> u16 {
150 self.ensure_response()
151 .map(|response| response.status().as_u16())
152 .unwrap_or(500)
153 }
154
155 fn status_message(&self) -> Option<&'_ str> {
157 self.ensure_response()
158 .ok()
159 .and_then(|response| response.status().canonical_reason())
160 }
161}
162
163impl Headers for HyperHttpConnection {
164 fn header(&self, name: &str) -> Option<&'_ str> {
166 self.ensure_response()
167 .ok()
168 .and_then(|response| response.headers().get(name))
169 .and_then(|value| value.to_str().ok())
170 }
171}
172
173impl Read for HyperHttpConnection {
174 fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
177 if self.read_buffer.is_empty() && self.response.is_some() {
179 self.load_response_body()?;
180 }
181
182 if self.read_buffer.is_empty() {
183 return Ok(0); }
185
186 let length = self.read_buffer.len().min(buffer.len());
187 buffer[..length].copy_from_slice(&self.read_buffer[..length]);
188 self.read_buffer = self.read_buffer.slice(length..);
189
190 Ok(length)
191 }
192}
193
194impl Write for HyperHttpConnection {
195 fn write(&mut self, buf: &[u8]) -> Result<usize, HyperError> {
197 self.write_buffer.extend_from_slice(buf);
198 Ok(buf.len())
199 }
200
201 fn flush(&mut self) -> Result<(), HyperError> {
203 let request = self.request.as_mut().ok_or(HyperError::NoRequest)?;
204 let body_data = std::mem::take(&mut self.write_buffer);
205 *request.body_mut() = Full::from(body_data);
206 Ok(())
207 }
208}
209
210impl Connection for HyperHttpConnection {
211 type Headers = Self;
212 type Read = Self;
213 type RawConnectionError = HyperError;
214 type RawConnection = Self;
215
216 fn initiate_request<'a>(
218 &'a mut self,
219 method: Method,
220 uri: &'a str,
221 headers: &'a [(&'a str, &'a str)],
222 ) -> Result<(), Self::Error> {
223 let mapped_method = Self::map_method(method)?;
224 let header_map = Self::build_headers(headers)?;
225
226 let mut request_builder = Request::builder().method(mapped_method).uri(uri);
227 if let Some(headers_mut) = request_builder.headers_mut() {
228 headers_mut.extend(header_map);
229 }
230
231 let request = request_builder
232 .body(Full::from(Bytes::new()))
233 .map_err(HyperError::Http)?;
234
235 self.request = Some(request);
236 self.response = None;
237 self.read_buffer = Bytes::new();
238 self.write_buffer.clear();
239
240 Ok(())
241 }
242
243 fn is_request_initiated(&self) -> bool {
245 self.request.is_some()
246 }
247
248 fn initiate_response(&mut self) -> Result<(), Self::Error> {
250 let request = self.request.take().ok_or(HyperError::NoRequest)?;
251 let response_future = self.client.request(request);
252 let response = self
253 .rt
254 .block_on(response_future)
255 .map_err(HyperError::Client)?;
256
257 self.response = Some(response);
258 Ok(())
259 }
260
261 fn is_response_initiated(&self) -> bool {
263 self.response.is_some()
264 }
265
266 fn split(&mut self) -> (&Self::Headers, &mut Self::Read) {
268 let headers: *const Self = self;
270 let headers = unsafe { &*headers };
271 (headers, self)
272 }
273
274 fn raw_connection(&mut self) -> Result<&mut Self::RawConnection, Self::Error> {
276 Ok(self)
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use embedded_svc::http::client::Client;
284
285 #[test]
287 fn test_request_and_response_flow() {
288 let conn = HyperHttpConnection::new().unwrap();
289 let mut client = Client::wrap(conn);
290
291 let request = client.get("https://httpbin.org/get").unwrap();
292 let mut response = request.submit().unwrap();
293
294 let mut body = Vec::new();
295 let mut buf = [0u8; 1024];
296
297 loop {
298 match response.read(&mut buf) {
299 Ok(0) => break, Ok(n) => body.extend_from_slice(&buf[..n]),
301 Err(e) => panic!("{:?}", e),
302 }
303 }
304
305 println!("{}", str::from_utf8(&body).unwrap());
306 }
307
308 #[test]
310 fn test_write_body_and_send() {
311 let conn = HyperHttpConnection::new().unwrap();
312 let mut client = Client::wrap(conn);
313
314 let body = r#"{"test": 2}"#;
315 let len = body.len().to_string();
316 let headers = &[
317 ("User-Agent", "TestAgent"),
318 ("Content-Type", "application/json"),
319 ("Content-Length", &len),
320 ];
321 let mut request = client.post("https://httpbin.org/post", headers).unwrap();
322 request.write(body.as_bytes()).unwrap();
323 request.flush().unwrap();
324 let mut response = request.submit().unwrap();
325
326 let mut body = Vec::new();
327 let mut buf = [0u8; 1024];
328
329 loop {
330 match response.read(&mut buf) {
331 Ok(0) => break, Ok(n) => body.extend_from_slice(&buf[..n]),
333 Err(e) => panic!("{:?}", e),
334 }
335 }
336
337 println!("{}", str::from_utf8(&body).unwrap());
338 }
339}