drogue_http_client/
con.rs

1use crate::{NoOpResponseHandler, ResponseHandler, Sink};
2use core::str::from_utf8;
3use heapless::{ArrayLength, Vec};
4use httparse::Status;
5
6/// An HTTP connection.
7pub struct HttpConnection<IN>
8where
9    IN: ArrayLength<u8>,
10{
11    // inbound transport buffer
12    inbound: Vec<u8, IN>,
13}
14
15impl<IN> HttpConnection<IN>
16where
17    IN: ArrayLength<u8>,
18{
19    /// Create a new instance.
20    ///
21    /// **Note:** The connection does not establish a new connection on e.g. a TCP stack. It more
22    /// manages the state of an HTTP connection.
23    pub fn new() -> Self {
24        HttpConnection {
25            inbound: Vec::new(),
26        }
27    }
28
29    /// Begin a new HTTP request.
30    ///
31    /// The request will only be sent to the sink (server) when one of the "execute" functions
32    /// is being invoked.
33    pub fn begin<'req>(
34        self,
35        method: &'static str,
36        path: &'static str,
37    ) -> RequestBuilder<'req, IN, NoOpResponseHandler> {
38        log::debug!("Begin new request - method: {}, path: {}", method, path);
39
40        RequestBuilder {
41            connection: self,
42            method,
43            path,
44            headers: None,
45            handler: NoOpResponseHandler,
46        }
47    }
48
49    /// Begin a new POST HTTP request.
50    pub fn post<'req>(self, path: &'static str) -> RequestBuilder<'req, IN, NoOpResponseHandler> {
51        self.begin("POST", path)
52    }
53
54    pub(crate) fn send_request<S, OUT>(
55        &mut self,
56        sink: &mut S,
57        method: &str,
58        path: &str,
59        headers: Option<&[(&str, &str)]>,
60        payload: Option<&[u8]>,
61    ) -> Result<(), ()>
62    where
63        S: Sink,
64        OUT: ArrayLength<u8>,
65    {
66        let mut out = Vec::<u8, OUT>::new();
67
68        // create headers
69        self.create_request_headers(&mut out, method, path, headers, payload.map(|b| b.len()))
70            .map_err(|_| ())?;
71
72        // send headers
73        sink.send(&out)?;
74
75        // send payload
76        if let Some(payload) = payload {
77            sink.send(payload)?;
78        }
79
80        Ok(())
81    }
82
83    fn create_request_headers(
84        &self,
85        w: &mut dyn core::fmt::Write,
86        method: &str,
87        path: &str,
88        headers: Option<&[(&str, &str)]>,
89        content_length: Option<usize>,
90    ) -> Result<(), core::fmt::Error> {
91        write!(w, "{} {} HTTP/1.1\r\n", method, path)?;
92        if let Some(headers) = headers {
93            if let Some(content_length) = content_length {
94                write!(w, "{}: {}\r\n", "Content-Length", content_length)?;
95            }
96            for header in headers {
97                write!(w, "{}: {}\r\n", header.0, header.1)?;
98            }
99        }
100        write!(w, "\r\n")?;
101
102        Ok(())
103    }
104
105    pub(crate) fn closed(&mut self) {
106        // FIXME: mark as closed
107    }
108}
109
110/// A request builder, which helps to gather all required information before executing the request.
111pub struct RequestBuilder<'req, IN, R>
112where
113    IN: ArrayLength<u8>,
114    R: ResponseHandler,
115{
116    connection: HttpConnection<IN>,
117    method: &'static str,
118    path: &'static str,
119    headers: Option<&'req [(&'req str, &'req str)]>,
120    handler: R,
121}
122
123impl<'req, IN, R> RequestBuilder<'req, IN, R>
124where
125    IN: ArrayLength<u8>,
126    R: ResponseHandler,
127{
128    /// Set the HTTP headers to send.
129    pub fn headers(mut self, headers: &'req [(&'req str, &'req str)]) -> Self {
130        self.headers = Some(headers);
131        self
132    }
133
134    /// Set the handler that will process the response.
135    pub fn handler<RN: ResponseHandler>(self, handler: RN) -> RequestBuilder<'req, IN, RN> {
136        RequestBuilder {
137            connection: self.connection,
138            headers: self.headers,
139            method: self.method,
140            path: self.path,
141            handler,
142        }
143    }
144
145    /// Execute the request, without any request payload.
146    pub fn execute<S, OUT>(self, sink: &mut S) -> Request<IN, R>
147    where
148        S: Sink,
149        OUT: ArrayLength<u8>,
150    {
151        self.execute_with::<S, OUT>(sink, None)
152    }
153
154    /// Execute the request, optionally providing some payload.
155    pub fn execute_with<S, OUT>(mut self, sink: &mut S, payload: Option<&[u8]>) -> Request<IN, R>
156    where
157        S: Sink,
158        OUT: ArrayLength<u8>,
159    {
160        // FIXME: handle error
161        self.connection
162            .send_request::<S, OUT>(sink, self.method, self.path, self.headers, payload);
163        let connection = self.connection;
164        let handler = self.handler;
165        Request {
166            connection,
167            handler,
168            state: State::Header,
169            processed_bytes: 0,
170        }
171    }
172}
173
174#[derive(Copy, Clone, Debug)]
175enum State {
176    Header,
177    Payload(usize),
178    Complete,
179    UnlimitedPayload,
180}
181
182/// The HTTP response header.
183#[derive(Debug)]
184pub struct Response<'a> {
185    pub version: u8,
186    pub code: u16,
187    pub reason: &'a str,
188}
189
190/// The ongoing HTTP request.
191pub struct Request<IN, R>
192where
193    IN: ArrayLength<u8>,
194    R: ResponseHandler,
195{
196    // connection
197    pub(crate) connection: HttpConnection<IN>,
198    // current handler
199    handler: R,
200    // current state
201    state: State,
202    // processed bytes
203    processed_bytes: usize,
204}
205
206impl<IN, R> Request<IN, R>
207where
208    IN: ArrayLength<u8>,
209    R: ResponseHandler,
210{
211    /// Check if the request is completely processed.
212    pub fn is_complete(&self) -> bool {
213        match self.state {
214            State::Complete => true,
215            _ => false,
216        }
217    }
218
219    fn push(&mut self, data: Result<Option<&[u8]>, ()>) {
220        log::debug!("Pushing data: {:?}", data.map(|o| o.map(|b| from_utf8(b))),);
221        match self.state {
222            State::Header => self.push_header(data),
223            State::Payload(size) => self.push_sized_payload(size, data),
224            State::UnlimitedPayload => self.push_payload(data),
225            State::Complete => self.push_complete_payload(data),
226        }
227    }
228
229    fn push_header(&mut self, data: Result<Option<&[u8]>, ()>) {
230        log::debug!("Current data: {:?}", from_utf8(&self.connection.inbound));
231
232        match data {
233            Ok(Some(data)) => {
234                self.connection.inbound.extend_from_slice(data).ok();
235
236                let mut headers = [httparse::EMPTY_HEADER; 16];
237                let mut response = httparse::Response::new(&mut headers);
238
239                match response.parse(&self.connection.inbound) {
240                    Ok(Status::Complete(len)) => {
241                        log::debug!("Completed({})", len);
242
243                        let content_size = response
244                            .headers
245                            .iter()
246                            .find(|e| e.name.eq_ignore_ascii_case("content-length"));
247
248                        // eval next state
249                        // FIXME: handle error
250                        self.state = match content_size {
251                            Some(header) => from_utf8(header.value)
252                                .map_err(|_| ())
253                                .and_then(|v| v.parse::<usize>().map_err(|_| ()))
254                                .map_or(State::UnlimitedPayload, |size| State::Payload(size)),
255                            None => State::UnlimitedPayload,
256                        };
257
258                        // log::debug!("Headers: {:?}", response.headers);
259                        log::debug!("Continue with: {:?}", self.state);
260
261                        // handle response
262                        self.handler.response(Response {
263                            version: response.version.unwrap_or_default(),
264                            code: response.code.unwrap_or_default(),
265                            reason: response.reason.unwrap_or_default(),
266                        });
267
268                        // clear connection buffer
269
270                        let buffer_len = self.connection.inbound.len();
271                        let data_len = data.len();
272
273                        log::debug!("Len = {}, dLen = {}, bLen = {}", len, data_len, buffer_len);
274
275                        // push on remaining data
276
277                        let start = len - (buffer_len - data_len);
278                        let rem_data = &data[start..];
279
280                        log::debug!(
281                            "Push bytes [{}..] after header to payload processing",
282                            start
283                        );
284
285                        self.push(Ok(Some(rem_data)));
286
287                        // clear buffer
288
289                        self.connection.inbound.clear();
290                    }
291                    Ok(Status::Partial) => {}
292                    Err(e) => {
293                        log::info!("Parse error: {:?}", e);
294                    }
295                }
296            }
297            Ok(None) => {
298                // FIXME: handle close
299            }
300            Err(_) => {
301                // FIXME: handle error
302            }
303        }
304    }
305
306    fn push_payload(&mut self, data: Result<Option<&[u8]>, ()>) {
307        log::debug!("More data: {:?}", data);
308
309        self.handler.more_payload(data);
310    }
311
312    fn push_complete_payload(&mut self, data: Result<Option<&[u8]>, ()>) {
313        log::debug!("More data (overflow): {:?}", data);
314        match data {
315            Ok(Some(data)) => {
316                // FIXME: handle error
317                self.connection.inbound.extend_from_slice(data);
318            }
319            Ok(None) | Err(_) => self.connection.closed(),
320        }
321    }
322
323    fn push_sized_payload(&mut self, expected_bytes: usize, data: Result<Option<&[u8]>, ()>) {
324        log::debug!("More data (sized): {:?}", data);
325
326        match data {
327            Ok(Some(data)) => {
328                let len = data.len();
329                let rem = expected_bytes - self.processed_bytes;
330                if len >= rem {
331                    self.handler.more_payload(Ok(Some(&data[0..rem])));
332                    // mark as complete
333                    self.state = State::Complete;
334                    // notify about complete
335                    self.handler.more_payload(Ok(None));
336                } else {
337                    self.handler.more_payload(Ok(Some(data)));
338                    self.processed_bytes += len;
339                }
340            }
341            Ok(None) => {
342                // FIXME: check for error
343            }
344            Err(_) => {}
345        }
346    }
347
348    /// Push more inbound data to the HTTP processing.
349    pub fn push_data(&mut self, data: &[u8]) {
350        self.push(Ok(Some(data)))
351    }
352
353    /// Notify the HTTP processing that the source has closed.
354    pub fn push_close(&mut self) {
355        self.push(Ok(None))
356    }
357
358    /// Stop processing the request, gives back the handler and connection.
359    pub fn complete(self) -> (HttpConnection<IN>, R) {
360        (self.connection, self.handler)
361    }
362}