native_svc/
lib.rs

1//! Hyper HTTP connection implementation using `hyper` and `tokio` runtime.
2//!
3//! This module provides a `HyperHttpConnection` type that implements the `embedded_svc`
4//! HTTP client `Connection` trait, allowing synchronous-style HTTP requests on top of
5//! the asynchronous `hyper` library.
6
7pub mod error;
8
9use crate::error::HyperError;
10use embedded_svc::http::client::Connection;
11use embedded_svc::http::{Headers, Method, Status};
12use embedded_svc::io::{ErrorType, Read, Write};
13use http_body_util::{BodyExt, Full};
14use hyper::body::{Bytes, Incoming};
15use hyper::header::{HeaderName, HeaderValue};
16use hyper::{HeaderMap, Request, Response};
17use hyper_tls::HttpsConnector;
18use hyper_util::client::legacy::Client;
19use hyper_util::client::legacy::connect::HttpConnector;
20use hyper_util::rt::TokioExecutor;
21use tokio::runtime::Runtime;
22
23/// Default capacity for the internal write buffer.
24const DEFAULT_BUFFER_SIZE: usize = 8192;
25
26/// Type alias for the Hyper client with TLS support and full-body requests.
27type HyperClient = Client<HttpsConnector<HttpConnector>, Full<Bytes>>;
28
29/// An HTTP connection using the Hyper library and Tokio runtime.
30///
31/// `HyperHttpConnection` wraps a synchronous-style API on top of an async `hyper`
32/// client, implementing the `embedded_svc::http::client::Connection` trait.
33///
34/// # Example
35///
36/// ```no_run
37/// use embedded_svc::http::client::Client;
38/// use native_svc::HyperHttpConnection;
39///
40/// let conn = HyperHttpConnection::new().unwrap();
41/// let mut client = Client::wrap(conn);
42/// let mut request = client.get("https://example.com").unwrap();
43/// let mut response = request.submit().unwrap();
44/// // read, process, etc.
45/// ```
46pub struct HyperHttpConnection {
47    rt: Runtime,
48    client: HyperClient,
49    request: Option<Request<Full<Bytes>>>,
50    response: Option<Response<Incoming>>,
51    read_buffer: Bytes,
52    write_buffer: Vec<u8>,
53}
54
55impl HyperHttpConnection {
56    /// Creates a new `HyperHttpConnection` instance.
57    ///
58    /// Initializes a Tokio runtime, a TLS-enabled Hyper client, and
59    /// prepares internal buffers. Returns an error if the runtime
60    /// cannot be created.
61    pub fn new() -> Result<Self, HyperError> {
62        let https = HttpsConnector::new();
63        let client = Client::builder(TokioExecutor::new()).build(https);
64        let rt = Runtime::new().map_err(HyperError::RuntimeCreation)?;
65
66        Ok(Self {
67            rt,
68            client,
69            request: None,
70            response: None,
71            read_buffer: Bytes::new(),
72            write_buffer: Vec::with_capacity(DEFAULT_BUFFER_SIZE),
73        })
74    }
75
76    /// Helper for mapping the embedded-svc HTTP `Method` enum to `hyper::Method`.
77    ///
78    /// Returns an error if the provided method is unsupported.
79    fn map_method(method: Method) -> Result<hyper::Method, HyperError> {
80        match method {
81            Method::Delete => Ok(hyper::Method::DELETE),
82            Method::Get => Ok(hyper::Method::GET),
83            Method::Head => Ok(hyper::Method::HEAD),
84            Method::Post => Ok(hyper::Method::POST),
85            Method::Put => Ok(hyper::Method::PUT),
86            Method::Connect => Ok(hyper::Method::CONNECT),
87            Method::Options => Ok(hyper::Method::OPTIONS),
88            Method::Trace => Ok(hyper::Method::TRACE),
89            Method::Patch => Ok(hyper::Method::PATCH),
90            _ => Err(HyperError::UnsupportedMethod(format!("{method:?}"))),
91        }
92    }
93
94    /// Constructs a `HeaderMap` from a slice of `(name, value)` pairs.
95    ///
96    /// Performs validation on header names and values, returning an error
97    /// if any are invalid.
98    fn build_headers(headers: &[(&str, &str)]) -> Result<HeaderMap, HyperError> {
99        let mut header_map = HeaderMap::with_capacity(headers.len());
100
101        for &(name, value) in headers {
102            let header_name =
103                HeaderName::from_bytes(name.as_bytes()).map_err(HyperError::InvalidHeaderName)?;
104            let header_value =
105                HeaderValue::from_str(value).map_err(HyperError::InvalidHeaderValue)?;
106            header_map.insert(header_name, header_value);
107        }
108
109        Ok(header_map)
110    }
111
112    /// Ensures that a response has been received, returning a reference to it.
113    ///
114    /// Returns `HyperError::NoResponse` if no response is available.
115    fn ensure_response(&self) -> Result<&Response<Incoming>, HyperError> {
116        self.response.as_ref().ok_or(HyperError::NoResponse)
117    }
118
119    /// Loads the entire response body into the internal read buffer.
120    ///
121    /// This consumes the `Response<Incoming>` and collects its body
122    /// into a contiguous `Bytes` buffer for streamline `Read` operations.
123    fn load_response_body(&mut self) -> Result<(), HyperError> {
124        if let Some(mut response) = self.response.take() {
125            let body_future = response.body_mut().collect();
126            let body = self.rt.block_on(body_future).map_err(HyperError::Hyper)?;
127            self.read_buffer = body.to_bytes();
128        }
129        Ok(())
130    }
131}
132
133impl Default for HyperHttpConnection {
134    /// Provides a default instance, panicking on failure.
135    ///
136    /// Equivalent to calling `HyperHttpConnection::new().unwrap()`.
137    fn default() -> Self {
138        Self::new().expect("Failed to create HyperHttpConnection")
139    }
140}
141
142impl ErrorType for HyperHttpConnection {
143    /// The error type returned by this connection.
144    type Error = HyperError;
145}
146
147impl Status for HyperHttpConnection {
148    /// Returns the HTTP status code of the last response, or 500 if none.
149    fn status(&self) -> u16 {
150        self.ensure_response()
151            .map(|response| response.status().as_u16())
152            .unwrap_or(500)
153    }
154
155    /// Returns the reason phrase of the last response status, if available.
156    fn status_message(&self) -> Option<&'_ str> {
157        self.ensure_response()
158            .ok()
159            .and_then(|response| response.status().canonical_reason())
160    }
161}
162
163impl Headers for HyperHttpConnection {
164    /// Retrieves a header value by name from the last response, if set.
165    fn header(&self, name: &str) -> Option<&'_ str> {
166        self.ensure_response()
167            .ok()
168            .and_then(|response| response.headers().get(name))
169            .and_then(|value| value.to_str().ok())
170    }
171}
172
173impl Read for HyperHttpConnection {
174    /// Reads data from the internal buffer, loading the response
175    /// body if needed. Returns `Ok(0)` on EOF.
176    fn read(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
177        // Load the body if buffer empty and response exists
178        if self.read_buffer.is_empty() && self.response.is_some() {
179            self.load_response_body()?;
180        }
181
182        if self.read_buffer.is_empty() {
183            return Ok(0); // EOF
184        }
185
186        let length = self.read_buffer.len().min(buffer.len());
187        buffer[..length].copy_from_slice(&self.read_buffer[..length]);
188        self.read_buffer = self.read_buffer.slice(length..);
189
190        Ok(length)
191    }
192}
193
194impl Write for HyperHttpConnection {
195    /// Buffers data to be sent in the request body.
196    fn write(&mut self, buf: &[u8]) -> Result<usize, HyperError> {
197        self.write_buffer.extend_from_slice(buf);
198        Ok(buf.len())
199    }
200
201    /// Finalizes the request body by replacing it with the buffered data.
202    fn flush(&mut self) -> Result<(), HyperError> {
203        let request = self.request.as_mut().ok_or(HyperError::NoRequest)?;
204        let body_data = std::mem::take(&mut self.write_buffer);
205        *request.body_mut() = Full::from(body_data);
206        Ok(())
207    }
208}
209
210impl Connection for HyperHttpConnection {
211    type Headers = Self;
212    type Read = Self;
213    type RawConnectionError = HyperError;
214    type RawConnection = Self;
215
216    /// Begins constructing an HTTP request with method, URI, and headers.
217    fn initiate_request<'a>(
218        &'a mut self,
219        method: Method,
220        uri: &'a str,
221        headers: &'a [(&'a str, &'a str)],
222    ) -> Result<(), Self::Error> {
223        let mapped_method = Self::map_method(method)?;
224        let header_map = Self::build_headers(headers)?;
225
226        let mut request_builder = Request::builder().method(mapped_method).uri(uri);
227        if let Some(headers_mut) = request_builder.headers_mut() {
228            headers_mut.extend(header_map);
229        }
230
231        let request = request_builder
232            .body(Full::from(Bytes::new()))
233            .map_err(HyperError::Http)?;
234
235        self.request = Some(request);
236        self.response = None;
237        self.read_buffer = Bytes::new();
238        self.write_buffer.clear();
239
240        Ok(())
241    }
242
243    /// Returns `true` if a request has been initiated.
244    fn is_request_initiated(&self) -> bool {
245        self.request.is_some()
246    }
247
248    /// Sends the initiated request and stores the response.
249    fn initiate_response(&mut self) -> Result<(), Self::Error> {
250        let request = self.request.take().ok_or(HyperError::NoRequest)?;
251        let response_future = self.client.request(request);
252        let response = self
253            .rt
254            .block_on(response_future)
255            .map_err(HyperError::Client)?;
256
257        self.response = Some(response);
258        Ok(())
259    }
260
261    /// Returns `true` if a response has been received.
262    fn is_response_initiated(&self) -> bool {
263        self.response.is_some()
264    }
265
266    /// Splits the connection into its header and body parts.
267    fn split(&mut self) -> (&Self::Headers, &mut Self::Read) {
268        // Safe pointer dance to return two references to self
269        let headers: *const Self = self;
270        let headers = unsafe { &*headers };
271        (headers, self)
272    }
273
274    /// Returns a mutable reference to the raw connection.
275    fn raw_connection(&mut self) -> Result<&mut Self::RawConnection, Self::Error> {
276        Ok(self)
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use embedded_svc::http::client::Client;
284
285    /// Tests a full GET request/response cycle against httpbin.org
286    #[test]
287    fn test_request_and_response_flow() {
288        let conn = HyperHttpConnection::new().unwrap();
289        let mut client = Client::wrap(conn);
290
291        let request = client.get("https://httpbin.org/get").unwrap();
292        let mut response = request.submit().unwrap();
293
294        let mut body = Vec::new();
295        let mut buf = [0u8; 1024];
296
297        loop {
298            match response.read(&mut buf) {
299                Ok(0) => break, // EOF
300                Ok(n) => body.extend_from_slice(&buf[..n]),
301                Err(e) => panic!("{:?}", e),
302            }
303        }
304
305        println!("{}", str::from_utf8(&body).unwrap());
306    }
307
308    /// Tests sending a JSON body via POST and reading the response.
309    #[test]
310    fn test_write_body_and_send() {
311        let conn = HyperHttpConnection::new().unwrap();
312        let mut client = Client::wrap(conn);
313
314        let body = r#"{"test": 2}"#;
315        let len = body.len().to_string();
316        let headers = &[
317            ("User-Agent", "TestAgent"),
318            ("Content-Type", "application/json"),
319            ("Content-Length", &len),
320        ];
321        let mut request = client.post("https://httpbin.org/post", headers).unwrap();
322        request.write(body.as_bytes()).unwrap();
323        request.flush().unwrap();
324        let mut response = request.submit().unwrap();
325
326        let mut body = Vec::new();
327        let mut buf = [0u8; 1024];
328
329        loop {
330            match response.read(&mut buf) {
331                Ok(0) => break, // EOF
332                Ok(n) => body.extend_from_slice(&buf[..n]),
333                Err(e) => panic!("{:?}", e),
334            }
335        }
336
337        println!("{}", str::from_utf8(&body).unwrap());
338    }
339}