Skip to main content

h11/
_connection.rs

1use crate::_events::*;
2use crate::_headers::*;
3use crate::_readers::*;
4use crate::_receivebuffer::*;
5use crate::_state::*;
6use crate::_util::*;
7use crate::_writers::*;
8use std::collections::HashMap;
9use std::collections::HashSet;
10
11static DEFAULT_MAX_INCOMPLETE_EVENT_SIZE: usize = 16 * 1024;
12
13enum RequestOrResponse {
14    Request(Request),
15    Response(Response),
16}
17
18impl RequestOrResponse {
19    pub fn headers(&self) -> &Headers {
20        match self {
21            Self::Request(request) => &request.headers,
22            Self::Response(response) => &response.headers,
23        }
24    }
25
26    pub fn http_version(&self) -> &Vec<u8> {
27        match self {
28            Self::Request(request) => &request.http_version,
29            Self::Response(response) => &response.http_version,
30        }
31    }
32}
33
34impl From<Request> for RequestOrResponse {
35    fn from(value: Request) -> Self {
36        Self::Request(value)
37    }
38}
39
40impl From<Response> for RequestOrResponse {
41    fn from(value: Response) -> Self {
42        Self::Response(value)
43    }
44}
45
46impl TryFrom<Event> for RequestOrResponse {
47    type Error = ProtocolError;
48
49    fn try_from(value: Event) -> Result<Self, Self::Error> {
50        match value {
51            Event::Request(request) => Ok(Self::Request(request)),
52            Event::NormalResponse(response) => Ok(Self::Response(response)),
53            Event::InformationalResponse(response) => Ok(Self::Response(response)),
54            _ => Err(ProtocolError::LocalProtocolError(
55                format!("Expected request or response event, got {:?}", value).into(),
56            )),
57        }
58    }
59}
60
61fn _keep_alive<T: Into<RequestOrResponse>>(event: T) -> bool {
62    let event: RequestOrResponse = event.into();
63    let connection = get_comma_header(event.headers(), b"connection");
64    if connection.contains(&b"close".to_vec()) {
65        return false;
66    }
67    if event.http_version() < &b"1.1".to_vec() {
68        return false;
69    }
70    return true;
71}
72
73fn _body_framing<T: Into<RequestOrResponse>>(
74    request_method: &[u8],
75    event: T,
76) -> Result<(&str, isize), ProtocolError> {
77    let event: RequestOrResponse = event.into();
78    if let RequestOrResponse::Response(response) = &event {
79        if response.status_code == 204
80            || response.status_code == 304
81            || request_method == b"HEAD"
82            || (request_method == b"CONNECT"
83                && 200 <= response.status_code
84                && response.status_code < 300)
85        {
86            return Ok(("content-length", 0));
87        }
88        if response.status_code < 200 {
89            return Err(ProtocolError::LocalProtocolError(
90                "Informational responses do not have a message body".into(),
91            ));
92        }
93    }
94
95    let transfer_encodings = get_comma_header(event.headers(), b"transfer-encoding");
96    if !transfer_encodings.is_empty() {
97        if transfer_encodings != vec![b"chunked".to_vec()] {
98            return Err(ProtocolError::LocalProtocolError(
99                ("Only Transfer-Encoding: chunked is supported", 501).into(),
100            ));
101        }
102        return Ok(("chunked", 0));
103    }
104
105    let content_lengths = get_comma_header(event.headers(), b"content-length");
106    if !content_lengths.is_empty() {
107        let length = std::str::from_utf8(&content_lengths[0])
108            .map_err(|_| ProtocolError::LocalProtocolError("bad Content-Length".into()))?
109            .parse()
110            .map_err(|_| ProtocolError::LocalProtocolError("bad Content-Length".into()))?;
111        return Ok(("content-length", length));
112    }
113
114    if let RequestOrResponse::Request(_) = event {
115        return Ok(("content-length", 0));
116    } else {
117        return Ok(("http/1.0", 0));
118    }
119}
120
121fn invalid_body_framing(framing_type: &str) -> ProtocolError {
122    ProtocolError::LocalProtocolError(
123        format!("Invalid role and framing type combination: {framing_type}").into(),
124    )
125}
126
127/// Sans-I/O HTTP/1.1 connection state machine.
128///
129/// A `Connection` owns parser and serializer state for one endpoint role. It
130/// does not perform I/O directly: callers supply received bytes with
131/// [`Connection::receive_data`], consume inbound events with
132/// [`Connection::next_event`], and write bytes returned by [`Connection::send`]
133/// to their transport.
134pub struct Connection {
135    /// Role represented by this connection object.
136    pub our_role: Role,
137    /// Peer role.
138    pub their_role: Role,
139    _cstate: ConnectionState,
140    _writer: Option<Box<WriterFnMut>>,
141    _reader: Option<Box<dyn Reader>>,
142    _max_incomplete_event_size: usize,
143    _receive_buffer: ReceiveBuffer,
144    _receive_buffer_closed: bool,
145    /// Most recently observed peer HTTP version, without the `HTTP/` prefix.
146    pub their_http_version: Option<Vec<u8>>,
147    _request_method: Option<Vec<u8>>,
148    client_is_waiting_for_100_continue: bool,
149}
150
151impl Connection {
152    /// Creates a connection for the given local role.
153    ///
154    /// `max_incomplete_event_size` limits buffered incomplete inbound data.
155    /// Passing `None` uses the crate default.
156    pub fn new(our_role: Role, max_incomplete_event_size: Option<usize>) -> Self {
157        Self {
158            our_role,
159            their_role: if our_role == Role::Client {
160                Role::Server
161            } else {
162                Role::Client
163            },
164            _cstate: ConnectionState::new(),
165            _writer: match our_role {
166                Role::Client => Some(Box::new(write_request)),
167                Role::Server => Some(Box::new(write_response)),
168            },
169            _reader: match our_role {
170                Role::Server => Some(Box::new(IdleClientReader {})),
171                Role::Client => Some(Box::new(SendResponseServerReader {})),
172            },
173            _max_incomplete_event_size: max_incomplete_event_size
174                .unwrap_or(DEFAULT_MAX_INCOMPLETE_EVENT_SIZE),
175            _receive_buffer: ReceiveBuffer::new(),
176            _receive_buffer_closed: false,
177            their_http_version: None,
178            _request_method: None,
179            client_is_waiting_for_100_continue: false,
180        }
181    }
182
183    /// Returns a cloned map of both role states.
184    pub fn get_states(&self) -> HashMap<Role, State> {
185        self._cstate.states.clone()
186    }
187
188    /// Returns the current state for this endpoint.
189    pub fn get_our_state(&self) -> State {
190        self._cstate.states[&self.our_role]
191    }
192
193    /// Returns the current state for the peer endpoint.
194    pub fn get_their_state(&self) -> State {
195        self._cstate.states[&self.their_role]
196    }
197
198    /// Returns whether the client is waiting for a `100 Continue` response.
199    pub fn get_client_is_waiting_for_100_continue(&self) -> bool {
200        self.client_is_waiting_for_100_continue
201    }
202
203    /// Returns whether the peer is waiting for a `100 Continue` response.
204    pub fn get_they_are_waiting_for_100_continue(&self) -> bool {
205        self.their_role == Role::Client && self.client_is_waiting_for_100_continue
206    }
207
208    /// Starts the next keep-alive request/response cycle.
209    ///
210    /// This is valid only when both sides have completed the previous cycle and
211    /// the state machine allows reuse.
212    pub fn start_next_cycle(&mut self) -> Result<(), ProtocolError> {
213        let old_states = self._cstate.states.clone();
214        self._cstate.start_next_cycle()?;
215        self._request_method = None;
216        self.their_http_version = None;
217        self.client_is_waiting_for_100_continue = false;
218        self._respond_to_state_changes(old_states, None)?;
219        Ok(())
220    }
221
222    fn _process_error(&mut self, role: Role) {
223        let old_states = self._cstate.states.clone();
224        self._cstate.process_error(role);
225        let _ = self._respond_to_state_changes(old_states, None);
226    }
227
228    fn _server_switch_event(&self, event: Event) -> Option<Switch> {
229        if let Event::InformationalResponse(informational_response) = &event {
230            if informational_response.status_code == 101 {
231                return Some(Switch::SwitchUpgrade);
232            }
233        }
234        if let Event::NormalResponse(response) = &event {
235            if self
236                ._cstate
237                .pending_switch_proposals
238                .contains(&Switch::SwitchConnect)
239                && 200 <= response.status_code
240                && response.status_code < 300
241            {
242                return Some(Switch::SwitchConnect);
243            }
244        }
245        return None;
246    }
247
248    fn _process_event(&mut self, role: Role, event: Event) -> Result<(), ProtocolError> {
249        let old_states = self._cstate.states.clone();
250        if role == Role::Client {
251            if let Event::Request(request) = event.clone() {
252                if request.method == b"CONNECT" {
253                    self._cstate
254                        .process_client_switch_proposal(Switch::SwitchConnect);
255                }
256                if get_comma_header(&request.headers, b"upgrade").len() > 0 {
257                    self._cstate
258                        .process_client_switch_proposal(Switch::SwitchUpgrade);
259                }
260            }
261        }
262        let server_switch_event = if role == Role::Server {
263            self._server_switch_event(event.clone())
264        } else {
265            None
266        };
267        self._cstate
268            .process_event(role, (&event).into(), server_switch_event)?;
269
270        if let Event::Request(request) = event.clone() {
271            self._request_method = Some(request.method);
272        }
273
274        if role == self.their_role {
275            if let Event::Request(request) = event.clone() {
276                self.their_http_version = Some(request.http_version);
277            }
278            if let Event::NormalResponse(response) = event.clone() {
279                self.their_http_version = Some(response.http_version);
280            }
281            if let Event::InformationalResponse(informational_response) = event.clone() {
282                self.their_http_version = Some(informational_response.http_version);
283            }
284        }
285
286        if let Event::Request(request) = event.clone() {
287            if !_keep_alive(RequestOrResponse::from(request)) {
288                self._cstate.process_keep_alive_disabled();
289            }
290        }
291        if let Event::NormalResponse(response) = event.clone() {
292            if !_keep_alive(RequestOrResponse::from(response)) {
293                self._cstate.process_keep_alive_disabled();
294            }
295        }
296
297        if let Event::Request(request) = event.clone() {
298            if has_expect_100_continue(&request) {
299                self.client_is_waiting_for_100_continue = true;
300            }
301        }
302        match (&event).into() {
303            EventType::InformationalResponse => {
304                self.client_is_waiting_for_100_continue = false;
305            }
306            EventType::NormalResponse => {
307                self.client_is_waiting_for_100_continue = false;
308            }
309            EventType::Data => {
310                if role == Role::Client {
311                    self.client_is_waiting_for_100_continue = false;
312                }
313            }
314            EventType::EndOfMessage => {
315                if role == Role::Client {
316                    self.client_is_waiting_for_100_continue = false;
317                }
318            }
319            _ => {}
320        }
321
322        self._respond_to_state_changes(old_states, Some(event))
323    }
324
325    fn _respond_to_state_changes(
326        &mut self,
327        old_states: HashMap<Role, State>,
328        event: Option<Event>,
329    ) -> Result<(), ProtocolError> {
330        if self.get_our_state() != old_states[&self.our_role] {
331            let state = self._cstate.states[&self.our_role];
332            self._writer = match state {
333                State::SendBody => {
334                    let request_method = self._request_method.clone().unwrap_or(vec![]);
335                    let event = event.clone().ok_or_else(|| {
336                        ProtocolError::LocalProtocolError(
337                            "Missing event for body framing".to_string().into(),
338                        )
339                    })?;
340                    let (framing_type, length) =
341                        _body_framing(&request_method, RequestOrResponse::try_from(event)?)?;
342
343                    match framing_type {
344                        "content-length" => Some(Box::new(content_length_writer(length))),
345                        "chunked" => Some(Box::new(chunked_writer())),
346                        "http/1.0" => Some(Box::new(http10_writer())),
347                        _ => return Err(invalid_body_framing(framing_type)),
348                    }
349                }
350                _ => match (&self.our_role, state) {
351                    (Role::Client, State::Idle) => Some(Box::new(write_request)),
352                    (Role::Server, State::Idle) => Some(Box::new(write_response)),
353                    (Role::Server, State::SendResponse) => Some(Box::new(write_response)),
354                    _ => None,
355                },
356            };
357        }
358        if self.get_their_state() != old_states[&self.their_role] {
359            self._reader = match self._cstate.states[&self.their_role] {
360                State::SendBody => {
361                    let request_method = self._request_method.clone().unwrap_or(vec![]);
362                    let event = event.clone().ok_or_else(|| {
363                        ProtocolError::LocalProtocolError(
364                            "Missing event for body framing".to_string().into(),
365                        )
366                    })?;
367                    let (framing_type, length) =
368                        _body_framing(&request_method, RequestOrResponse::try_from(event)?)?;
369                    match framing_type {
370                        "content-length" => Some(Box::new(ContentLengthReader::new(
371                            usize::try_from(length).map_err(|_| {
372                                ProtocolError::LocalProtocolError(
373                                    "negative Content-Length".to_string().into(),
374                                )
375                            })?,
376                        ))),
377                        "chunked" => Some(Box::new(ChunkedReader::new())),
378                        "http/1.0" => Some(Box::new(Http10Reader {})),
379                        _ => return Err(invalid_body_framing(framing_type)),
380                    }
381                }
382                _ => match (&self.their_role, self._cstate.states[&self.their_role]) {
383                    (Role::Client, State::Idle) => Some(Box::new(IdleClientReader {})),
384                    (Role::Server, State::Idle) => Some(Box::new(SendResponseServerReader {})),
385                    (Role::Server, State::SendResponse) => {
386                        Some(Box::new(SendResponseServerReader {}))
387                    }
388                    (Role::Client, State::Done) => Some(Box::new(ClosedReader {})),
389                    (Role::Client, State::MustClose) => Some(Box::new(ClosedReader {})),
390                    (Role::Client, State::Closed) => Some(Box::new(ClosedReader {})),
391                    (Role::Server, State::Done) => Some(Box::new(ClosedReader {})),
392                    (Role::Server, State::MustClose) => Some(Box::new(ClosedReader {})),
393                    (Role::Server, State::Closed) => Some(Box::new(ClosedReader {})),
394                    _ => None,
395                },
396            };
397        }
398        Ok(())
399    }
400
401    /// Returns bytes received after the current event stream paused.
402    ///
403    /// The boolean indicates whether EOF has been received.
404    pub fn get_trailing_data(&self) -> (Vec<u8>, bool) {
405        (
406            self._receive_buffer.bytes().to_vec(),
407            self._receive_buffer_closed,
408        )
409    }
410
411    /// Feeds received bytes into the connection.
412    ///
413    /// Passing an empty slice marks EOF. After EOF, passing more bytes is a
414    /// local protocol error.
415    pub fn receive_data(&mut self, data: &[u8]) -> Result<(), ProtocolError> {
416        Ok(if data.len() > 0 {
417            if self._receive_buffer_closed {
418                return Err(ProtocolError::LocalProtocolError(
419                    "received close, then received more data?".into(),
420                ));
421            }
422            self._receive_buffer.add(data);
423        } else {
424            self._receive_buffer_closed = true;
425        })
426    }
427
428    fn _extract_next_receive_event(&mut self) -> Result<Event, ProtocolError> {
429        let state = self.get_their_state();
430        if state == State::Done && self._receive_buffer.len() > 0 {
431            return Ok(Event::Paused());
432        }
433        if state == State::MightSwitchProtocol || state == State::SwitchedProtocol {
434            return Ok(Event::Paused());
435        }
436        let reader = self._reader.as_mut().ok_or_else(|| {
437            ProtocolError::RemoteProtocolError(
438                format!("No reader available for peer state {:?}", state).into(),
439            )
440        })?;
441        let event = reader.call(&mut self._receive_buffer)?;
442        if event.is_none() {
443            if self._receive_buffer.len() == 0 && self._receive_buffer_closed {
444                return reader.read_eof();
445            }
446        }
447        Ok(event.unwrap_or(Event::NeedData()))
448    }
449
450    /// Returns the next inbound protocol event.
451    ///
452    /// If more bytes are needed, returns [`Event::NeedData`]. If processing is
453    /// paused behind a completed message or protocol switch, returns
454    /// [`Event::Paused`].
455    pub fn next_event(&mut self) -> Result<Event, ProtocolError> {
456        if self.get_their_state() == State::Error {
457            return Err(ProtocolError::RemoteProtocolError(
458                "Can't receive data when peer state is ERROR".into(),
459            ));
460        }
461        match (|| {
462            let event = self._extract_next_receive_event()?;
463            match event {
464                Event::NeedData() | Event::Paused() => {}
465                _ => {
466                    self._process_event(self.their_role, event.clone())?;
467                }
468            };
469
470            if let Event::NeedData() = event.clone() {
471                if self._receive_buffer.len() > self._max_incomplete_event_size {
472                    return Err(ProtocolError::RemoteProtocolError(
473                        ("Receive buffer too long".to_string(), 431).into(),
474                    ));
475                }
476                if self._receive_buffer_closed {
477                    return Err(ProtocolError::RemoteProtocolError(
478                        "peer unexpectedly closed connection".to_string().into(),
479                    ));
480                }
481            }
482
483            Ok(event)
484        })() {
485            Err(error) => {
486                self._process_error(self.their_role);
487                match error {
488                    ProtocolError::LocalProtocolError(error) => {
489                        Err(error._reraise_as_remote_protocol_error().into())
490                    }
491                    _ => Err(error),
492                }
493            }
494            Ok(any) => Ok(any),
495        }
496    }
497
498    /// Serializes an outbound event and advances local state.
499    ///
500    /// Returns `Ok(Some(bytes))` for events that produce wire bytes and
501    /// `Ok(None)` for [`Event::ConnectionClosed`]. Invalid local sequencing or
502    /// invalid event contents return [`ProtocolError::LocalProtocolError`].
503    pub fn send(&mut self, mut event: Event) -> Result<Option<Vec<u8>>, ProtocolError> {
504        if self.get_our_state() == State::Error {
505            return Err(ProtocolError::LocalProtocolError(
506                "Can't send data when our state is ERROR".to_string().into(),
507            ));
508        }
509        event = if let Event::NormalResponse(response) = &event {
510            Event::NormalResponse(self._clean_up_response_headers_for_sending(response.clone())?)
511        } else {
512            event
513        };
514        let event_type: EventType = (&event).into();
515        let data = if event_type == EventType::ConnectionClosed {
516            Ok(vec![])
517        } else {
518            match self._writer.as_mut() {
519                Some(writer) => writer(event.clone()),
520                None => Err(ProtocolError::LocalProtocolError(
521                    "Can't send data when our state is not SEND_BODY"
522                        .to_string()
523                        .into(),
524                )),
525            }
526        }
527        .inspect_err(|_| {
528            self._process_error(self.our_role);
529        })?;
530
531        if let Err(error) = self._process_event(self.our_role, event.clone()) {
532            self._process_error(self.our_role);
533            return Err(error);
534        };
535
536        if event_type == EventType::ConnectionClosed {
537            return Ok(None);
538        } else {
539            Ok(Some(data))
540        }
541    }
542
543    /// Marks the local send side as errored after an external write failure.
544    pub fn send_failed(&mut self) {
545        self._process_error(self.our_role);
546    }
547
548    fn _clean_up_response_headers_for_sending(
549        &self,
550        response: Response,
551    ) -> Result<Response, ProtocolError> {
552        let mut headers = response.clone().headers;
553        let mut need_close = false;
554        let mut method_for_choosing_headers = self._request_method.clone().unwrap_or(vec![]);
555        if method_for_choosing_headers == b"HEAD".to_vec() {
556            method_for_choosing_headers = b"GET".to_vec();
557        }
558        let (framing_type, _) = _body_framing(&method_for_choosing_headers, response.clone())?;
559        if framing_type == "chunked" || framing_type == "http/1.0" {
560            headers = set_comma_header(&headers, b"content-length", vec![])?;
561            if self
562                .their_http_version
563                .clone()
564                .map(|v| v < b"1.1".to_vec())
565                .unwrap_or(true)
566            {
567                headers = set_comma_header(&headers, b"transfer-encoding", vec![])?;
568                if self._request_method.clone().unwrap_or(vec![]) != b"HEAD".to_vec() {
569                    need_close = true;
570                }
571            } else {
572                headers =
573                    set_comma_header(&headers, b"transfer-encoding", vec![b"chunked".to_vec()])?;
574            }
575        }
576        if !self._cstate.keep_alive || need_close {
577            let mut connection: HashSet<Vec<u8>> = get_comma_header(&headers, b"connection")
578                .into_iter()
579                .collect();
580            connection.retain(|x| x != &b"keep-alive".to_vec());
581            connection.insert(b"close".to_vec());
582            headers = set_comma_header(&headers, b"connection", connection.into_iter().collect())?;
583        }
584        return Ok(Response {
585            headers,
586            status_code: response.status_code,
587            http_version: response.http_version,
588            reason: response.reason,
589        });
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use super::*;
596
597    // from typing import cast, List, Type, Union, ValuesView
598
599    // from .._connection import Connection, NEED_DATA, PAUSED
600    // from .._events import (
601    //     ConnectionClosed,
602    //     Data,
603    //     EndOfMessage,
604    //     Event,
605    //     InformationalResponse,
606    //     Request,
607    //     Response,
608    // )
609    // from .._state import CLIENT, CLOSED, DONE, MUST_CLOSE, SERVER
610    // from .._util import Sentinel
611
612    // try:
613    //     from typing import Literal
614    // except ImportError:
615    //     from typing_extensions import Literal  # type: ignore
616
617    // def get_all_events(conn: Connection) -> List[Event]:
618    //     got_events = []
619    //     while True:
620    //         event = conn.next_event()
621    //         if event in (NEED_DATA, PAUSED):
622    //             break
623    //         event = cast(Event, event)
624    //         got_events.append(event)
625    //         if type(event) is ConnectionClosed:
626    //             break
627    //     return got_events
628
629    // def receive_and_get(conn: Connection, data: bytes) -> List[Event]:
630    //     conn.receive_data(data)
631    //     return get_all_events(conn)
632
633    // # Merges adjacent Data events, converts payloads to bytestrings, and removes
634    // # chunk boundaries.
635    // def normalize_data_events(in_events: List[Event]) -> List[Event]:
636    //     out_events: List[Event] = []
637    //     for event in in_events:
638    //         if type(event) is Data:
639    //             event = Data(data=bytes(event.data), chunk_start=False, chunk_end=False)
640    //         if out_events and type(out_events[-1]) is type(event) is Data:
641    //             out_events[-1] = Data(
642    //                 data=out_events[-1].data + event.data,
643    //                 chunk_start=out_events[-1].chunk_start,
644    //                 chunk_end=out_events[-1].chunk_end,
645    //             )
646    //         else:
647    //             out_events.append(event)
648    //     return out_events
649
650    // # Given that we want to write tests that push some events through a Connection
651    // # and check that its state updates appropriately... we might as make a habit
652    // # of pushing them through two Connections with a fake network link in
653    // # between.
654    // class ConnectionPair:
655    //     def __init__(self) -> None:
656    //         self.conn = {CLIENT: Connection(CLIENT), SERVER: Connection(SERVER)}
657    //         self.other = {CLIENT: SERVER, SERVER: CLIENT}
658
659    //     @property
660    //     def conns(self) -> ValuesView[Connection]:
661    //         return self.conn.values()
662
663    //     # expect="match" if expect=send_events; expect=[...] to say what expected
664    //     def send(
665    //         self,
666    //         role: Type[Sentinel],
667    //         send_events: Union[List[Event], Event],
668    //         expect: Union[List[Event], Event, Literal["match"]] = "match",
669    //     ) -> bytes:
670    //         if not isinstance(send_events, list):
671    //             send_events = [send_events]
672    //         data = b""
673    //         closed = False
674    //         for send_event in send_events:
675    //             new_data = self.conn[role].send(send_event)
676    //             if new_data is None:
677    //                 closed = True
678    //             else:
679    //                 data += new_data
680    //         # send uses b"" to mean b"", and None to mean closed
681    //         # receive uses b"" to mean closed, and None to mean "try again"
682    //         # so we have to translate between the two conventions
683    //         if data:
684    //             self.conn[self.other[role]].receive_data(data)
685    //         if closed:
686    //             self.conn[self.other[role]].receive_data(b"")
687    //         got_events = get_all_events(self.conn[self.other[role]])
688    //         if expect == "match":
689    //             expect = send_events
690    //         if not isinstance(expect, list):
691    //             expect = [expect]
692    //         assert got_events == expect
693    //         return data
694
695    use std::collections::HashMap;
696
697    pub fn get_all_events(conn: &mut Connection) -> Result<Vec<Event>, ProtocolError> {
698        let mut got_events = Vec::new();
699        loop {
700            let event = conn.next_event()?;
701            let event_type = EventType::from(&event);
702            if event_type == EventType::NeedData || event_type == EventType::Paused {
703                break;
704            }
705            got_events.push(event);
706            if event_type == EventType::ConnectionClosed {
707                break;
708            }
709        }
710        return Ok(got_events);
711    }
712
713    pub fn receive_and_get(
714        conn: &mut Connection,
715        data: &[u8],
716    ) -> Result<Vec<Event>, ProtocolError> {
717        conn.receive_data(data).unwrap();
718        return get_all_events(conn);
719    }
720
721    pub struct ConnectionPair {
722        pub conn: HashMap<Role, Connection>,
723        pub other: HashMap<Role, Role>,
724    }
725
726    impl ConnectionPair {
727        pub fn new() -> Self {
728            Self {
729                conn: HashMap::from([
730                    (Role::Client, Connection::new(Role::Client, None)),
731                    (Role::Server, Connection::new(Role::Server, None)),
732                ]),
733                other: HashMap::from([(Role::Client, Role::Server), (Role::Server, Role::Client)]),
734            }
735        }
736
737        pub fn send(
738            &mut self,
739            role: Role,
740            send_events: Vec<Event>,
741            expect: Option<Vec<Event>>,
742        ) -> Result<Vec<u8>, ProtocolError> {
743            let mut data = Vec::new();
744            let mut closed = false;
745            for send_event in &send_events {
746                match self.conn.get_mut(&role).unwrap().send(send_event.clone())? {
747                    Some(new_data) => data.extend(new_data),
748                    None => closed = true,
749                }
750            }
751            if !data.is_empty() {
752                self.conn
753                    .get_mut(&self.other[&role])
754                    .unwrap()
755                    .receive_data(&data)
756                    .unwrap();
757            }
758            if closed {
759                self.conn
760                    .get_mut(&self.other[&role])
761                    .unwrap()
762                    .receive_data(b"")
763                    .unwrap();
764            }
765            let got_events = get_all_events(self.conn.get_mut(&self.other[&role]).unwrap())?;
766            match expect {
767                Some(expect) => assert_eq!(got_events, expect),
768                None => assert_eq!(got_events, send_events),
769            };
770
771            Ok(data)
772        }
773    }
774
775    #[test]
776    fn test_keep_alive() {
777        assert!(_keep_alive(Request {
778            method: b"GET".to_vec(),
779            target: b"/".to_vec(),
780            headers: vec![(b"Host".to_vec(), b"Example.com".to_vec())].into(),
781            http_version: b"1.1".to_vec(),
782        }));
783        assert!(!_keep_alive(Request {
784            method: b"GET".to_vec(),
785            target: b"/".to_vec(),
786            headers: vec![
787                (b"Host".to_vec(), b"Example.com".to_vec()),
788                (b"Connection".to_vec(), b"close".to_vec()),
789            ]
790            .into(),
791            http_version: b"1.1".to_vec(),
792        }));
793        assert!(!_keep_alive(Request {
794            method: b"GET".to_vec(),
795            target: b"/".to_vec(),
796            headers: vec![
797                (b"Host".to_vec(), b"Example.com".to_vec()),
798                (b"Connection".to_vec(), b"a, b, cLOse, foo".to_vec()),
799            ]
800            .into(),
801            http_version: b"1.1".to_vec(),
802        }));
803        assert!(!_keep_alive(Request {
804            method: b"GET".to_vec(),
805            target: b"/".to_vec(),
806            headers: vec![].into(),
807            http_version: b"1.0".to_vec(),
808        }));
809
810        assert!(_keep_alive(Response {
811            status_code: 200,
812            headers: vec![].into(),
813            http_version: b"1.1".to_vec(),
814            reason: b"OK".to_vec(),
815        }));
816        assert!(!_keep_alive(Response {
817            status_code: 200,
818            headers: vec![(b"Connection".to_vec(), b"close".to_vec())].into(),
819            http_version: b"1.1".to_vec(),
820            reason: b"OK".to_vec(),
821        }));
822        assert!(!_keep_alive(Response {
823            status_code: 200,
824            headers: vec![(b"Connection".to_vec(), b"a, b, cLOse, foo".to_vec()),].into(),
825            http_version: b"1.1".to_vec(),
826            reason: b"OK".to_vec(),
827        }));
828        assert!(!_keep_alive(Response {
829            status_code: 200,
830            headers: vec![].into(),
831            http_version: b"1.0".to_vec(),
832            reason: b"OK".to_vec(),
833        }));
834    }
835
836    #[test]
837    fn test_body_framing() {
838        fn headers(cl: Option<usize>, te: bool) -> Headers {
839            let mut headers = vec![];
840            if let Some(cl) = cl {
841                headers.push((
842                    b"Content-Length".to_vec(),
843                    cl.to_string().as_bytes().to_vec(),
844                ));
845            }
846            if te {
847                headers.push((b"Transfer-Encoding".to_vec(), b"chunked".to_vec()));
848            }
849            headers.push((b"Host".to_vec(), b"example.com".to_vec()));
850            return headers.into();
851        }
852
853        fn resp(status_code: u16, cl: Option<usize>, te: bool) -> Response {
854            Response {
855                status_code,
856                headers: headers(cl, te),
857                http_version: b"1.1".to_vec(),
858                reason: b"OK".to_vec(),
859            }
860        }
861
862        fn req(cl: Option<usize>, te: bool) -> Request {
863            Request {
864                method: b"GET".to_vec(),
865                target: b"/".to_vec(),
866                headers: headers(cl, te),
867                http_version: b"1.1".to_vec(),
868            }
869        }
870
871        // Special cases where the headers are ignored:
872        for (cl, te) in vec![(Some(100), false), (None, true), (Some(100), true)] {
873            for (meth, r) in vec![
874                (b"HEAD".to_vec(), resp(200, cl, te)),
875                (b"GET".to_vec(), resp(204, cl, te)),
876                (b"GET".to_vec(), resp(304, cl, te)),
877            ] {
878                assert_eq!(_body_framing(&meth, r).unwrap(), ("content-length", 0));
879            }
880        }
881
882        // Transfer-encoding
883        for (cl, te) in vec![(None, true), (Some(100), true)] {
884            for (meth, r) in vec![
885                (b"".to_vec(), RequestOrResponse::from(req(cl, te))),
886                (b"GET".to_vec(), RequestOrResponse::from(resp(200, cl, te))),
887            ] {
888                assert_eq!(_body_framing(&meth, r).unwrap(), ("chunked", 0));
889            }
890        }
891
892        // Content-Length
893        for (meth, r) in vec![
894            (b"".to_vec(), RequestOrResponse::from(req(Some(100), false))),
895            (
896                b"GET".to_vec(),
897                RequestOrResponse::from(resp(200, Some(100), false)),
898            ),
899        ] {
900            assert_eq!(_body_framing(&meth, r).unwrap(), ("content-length", 100));
901        }
902
903        // No headers
904        assert_eq!(
905            _body_framing(b"", req(None, false)).unwrap(),
906            ("content-length", 0)
907        );
908        assert_eq!(
909            _body_framing(b"GET", resp(200, None, false)).unwrap(),
910            ("http/1.0", 0)
911        );
912
913        assert!(matches!(
914            _body_framing(b"GET", resp(100, None, false)),
915            Err(ProtocolError::LocalProtocolError(_))
916        ));
917    }
918
919    #[test]
920    fn test_connection_basics_and_content_length() {
921        let mut p = ConnectionPair::new();
922        assert_eq!(
923            p.send(
924                Role::Client,
925                vec![Request::new(
926                    b"GET".to_vec(),
927                    vec![
928                        (b"Host".to_vec(), b"example.com".to_vec()),
929                        (b"Content-Length".to_vec(), b"10".to_vec())
930                    ]
931                    .into(),
932                    b"/".to_vec(),
933                    b"1.1".to_vec(),
934                )
935                .unwrap()
936                .into(),],
937                None,
938            )
939            .unwrap(),
940            b"GET / HTTP/1.1\r\nHost: example.com\r\nContent-Length: 10\r\n\r\n".to_vec()
941        );
942        for (_, connection) in &p.conn {
943            assert_eq!(
944                connection.get_states(),
945                HashMap::from([
946                    (Role::Client, State::SendBody),
947                    (Role::Server, State::SendResponse),
948                ])
949            );
950        }
951        assert_eq!(p.conn[&Role::Client].get_our_state(), State::SendBody);
952        assert_eq!(p.conn[&Role::Client].get_their_state(), State::SendResponse);
953        assert_eq!(p.conn[&Role::Server].get_our_state(), State::SendResponse);
954        assert_eq!(p.conn[&Role::Server].get_their_state(), State::SendBody);
955        assert_eq!(p.conn[&Role::Client].their_http_version, None);
956        assert_eq!(
957            p.conn[&Role::Server].their_http_version,
958            Some(b"1.1".to_vec())
959        );
960
961        assert_eq!(
962            p.send(
963                Role::Server,
964                vec![Response {
965                    status_code: 100,
966                    headers: vec![].into(),
967                    http_version: b"1.1".to_vec(),
968                    reason: b"".to_vec(),
969                }
970                .into()],
971                None
972            )
973            .unwrap(),
974            b"HTTP/1.1 100 \r\n\r\n".to_vec()
975        );
976
977        assert_eq!(
978            p.send(
979                Role::Server,
980                vec![Response {
981                    status_code: 200,
982                    headers: vec![(b"Content-Length".to_vec(), b"11".to_vec())].into(),
983                    http_version: b"1.1".to_vec(),
984                    reason: b"".to_vec(),
985                }
986                .into()],
987                None
988            )
989            .unwrap(),
990            b"HTTP/1.1 200 \r\nContent-Length: 11\r\n\r\n".to_vec()
991        );
992
993        for (_, connection) in &p.conn {
994            assert_eq!(
995                connection.get_states(),
996                HashMap::from([
997                    (Role::Client, State::SendBody),
998                    (Role::Server, State::SendBody),
999                ])
1000            );
1001        }
1002
1003        assert_eq!(
1004            p.conn[&Role::Client].their_http_version,
1005            Some(b"1.1".to_vec())
1006        );
1007        assert_eq!(
1008            p.conn[&Role::Server].their_http_version,
1009            Some(b"1.1".to_vec())
1010        );
1011
1012        assert_eq!(
1013            p.send(
1014                Role::Client,
1015                vec![Data {
1016                    data: b"12345".to_vec(),
1017                    chunk_start: false,
1018                    chunk_end: false,
1019                }
1020                .into()],
1021                None
1022            )
1023            .unwrap(),
1024            b"12345".to_vec()
1025        );
1026
1027        assert_eq!(
1028            p.send(
1029                Role::Client,
1030                vec![Data {
1031                    data: b"67890".to_vec(),
1032                    chunk_start: false,
1033                    chunk_end: false,
1034                }
1035                .into()],
1036                Some(vec![
1037                    Data {
1038                        data: b"67890".to_vec(),
1039                        chunk_start: false,
1040                        chunk_end: false,
1041                    }
1042                    .into(),
1043                    EndOfMessage::default().into(),
1044                ]),
1045            )
1046            .unwrap(),
1047            b"67890".to_vec()
1048        );
1049
1050        assert_eq!(
1051            p.send(
1052                Role::Client,
1053                vec![EndOfMessage::default().into()],
1054                Some(vec![]),
1055            )
1056            .unwrap(),
1057            b"".to_vec()
1058        );
1059
1060        for (_, connection) in &p.conn {
1061            assert_eq!(
1062                connection.get_states(),
1063                HashMap::from([(Role::Client, State::Done), (Role::Server, State::SendBody),])
1064            );
1065        }
1066
1067        assert_eq!(
1068            p.send(
1069                Role::Server,
1070                vec![Data {
1071                    data: b"1234567890".to_vec(),
1072                    chunk_start: false,
1073                    chunk_end: false,
1074                }
1075                .into()],
1076                None
1077            )
1078            .unwrap(),
1079            b"1234567890".to_vec()
1080        );
1081
1082        assert_eq!(
1083            p.send(
1084                Role::Server,
1085                vec![Data {
1086                    data: b"1".to_vec(),
1087                    chunk_start: false,
1088                    chunk_end: false,
1089                }
1090                .into()],
1091                Some(vec![
1092                    Data {
1093                        data: b"1".to_vec(),
1094                        chunk_start: false,
1095                        chunk_end: false,
1096                    }
1097                    .into(),
1098                    EndOfMessage::default().into(),
1099                ]),
1100            )
1101            .unwrap(),
1102            b"1".to_vec()
1103        );
1104
1105        assert_eq!(
1106            p.send(
1107                Role::Server,
1108                vec![EndOfMessage::default().into()],
1109                Some(vec![]),
1110            )
1111            .unwrap(),
1112            b"".to_vec()
1113        );
1114
1115        for (_, connection) in &p.conn {
1116            assert_eq!(
1117                connection.get_states(),
1118                HashMap::from([(Role::Client, State::Done), (Role::Server, State::Done),])
1119            );
1120        }
1121    }
1122
1123    #[test]
1124    fn test_chunked() {
1125        let mut p = ConnectionPair::new();
1126        assert_eq!(
1127            p.send(
1128                Role::Client,
1129                vec![Request::new(
1130                    b"GET".to_vec(),
1131                    vec![
1132                        (b"Host".to_vec(), b"example.com".to_vec()),
1133                        (b"Transfer-Encoding".to_vec(), b"chunked".to_vec())
1134                    ]
1135                    .into(),
1136                    b"/".to_vec(),
1137                    b"1.1".to_vec(),
1138                )
1139                .unwrap()
1140                .into(),],
1141                None,
1142            )
1143            .unwrap(),
1144            b"GET / HTTP/1.1\r\nHost: example.com\r\nTransfer-Encoding: chunked\r\n\r\n".to_vec()
1145        );
1146        assert_eq!(
1147            p.send(
1148                Role::Client,
1149                vec![Data {
1150                    data: b"1234567890".to_vec(),
1151                    chunk_start: true,
1152                    chunk_end: true,
1153                }
1154                .into()],
1155                None,
1156            )
1157            .unwrap(),
1158            b"a\r\n1234567890\r\n".to_vec()
1159        );
1160        assert_eq!(
1161            p.send(
1162                Role::Client,
1163                vec![Data {
1164                    data: b"abcde".to_vec(),
1165                    chunk_start: true,
1166                    chunk_end: true,
1167                }
1168                .into()],
1169                None,
1170            )
1171            .unwrap(),
1172            b"5\r\nabcde\r\n".to_vec()
1173        );
1174        assert_eq!(
1175            p.send(Role::Client, vec![Data::default().into()], Some(vec![]),)
1176                .unwrap(),
1177            b"".to_vec()
1178        );
1179        assert_eq!(
1180            p.send(
1181                Role::Client,
1182                vec![EndOfMessage {
1183                    headers: vec![(b"hello".to_vec(), b"there".to_vec())].into(),
1184                }
1185                .into()],
1186                None,
1187            )
1188            .unwrap(),
1189            b"0\r\nhello: there\r\n\r\n".to_vec()
1190        );
1191
1192        assert_eq!(
1193            p.send(
1194                Role::Server,
1195                vec![Response {
1196                    status_code: 200,
1197                    headers: vec![
1198                        (b"hello".to_vec(), b"there".to_vec()),
1199                        (b"transfer-encoding".to_vec(), b"chunked".to_vec()),
1200                    ]
1201                    .into(),
1202                    http_version: b"1.1".to_vec(),
1203                    reason: b"".to_vec(),
1204                }
1205                .into()],
1206                None,
1207            )
1208            .unwrap(),
1209            b"HTTP/1.1 200 \r\nhello: there\r\ntransfer-encoding: chunked\r\n\r\n".to_vec()
1210        );
1211        assert_eq!(
1212            p.send(
1213                Role::Server,
1214                vec![Data {
1215                    data: b"54321".to_vec(),
1216                    chunk_start: true,
1217                    chunk_end: true,
1218                }
1219                .into()],
1220                None,
1221            )
1222            .unwrap(),
1223            b"5\r\n54321\r\n".to_vec()
1224        );
1225        assert_eq!(
1226            p.send(
1227                Role::Server,
1228                vec![Data {
1229                    data: b"12345".to_vec(),
1230                    chunk_start: true,
1231                    chunk_end: true,
1232                }
1233                .into()],
1234                None,
1235            )
1236            .unwrap(),
1237            b"5\r\n12345\r\n".to_vec()
1238        );
1239        assert_eq!(
1240            p.send(Role::Server, vec![EndOfMessage::default().into()], None,)
1241                .unwrap(),
1242            b"0\r\n\r\n".to_vec()
1243        );
1244
1245        for (_, connection) in &p.conn {
1246            assert_eq!(
1247                connection.get_states(),
1248                HashMap::from([(Role::Client, State::Done), (Role::Server, State::Done),])
1249            );
1250        }
1251    }
1252
1253    #[test]
1254    fn test_chunk_boundaries() {
1255        let mut conn = Connection::new(Role::Server, None);
1256
1257        let request = b"POST / HTTP/1.1\r\nHost: example.com\r\nTransfer-Encoding: chunked\r\n\r\n";
1258        conn.receive_data(request).unwrap();
1259        assert_eq!(
1260            conn.next_event().unwrap(),
1261            Event::Request(Request {
1262                method: b"POST".to_vec(),
1263                target: b"/".to_vec(),
1264                headers: vec![
1265                    (b"Host".to_vec(), b"example.com".to_vec()),
1266                    (b"Transfer-Encoding".to_vec(), b"chunked".to_vec())
1267                ]
1268                .into(),
1269                http_version: b"1.1".to_vec(),
1270            })
1271        );
1272        assert_eq!(conn.next_event().unwrap(), Event::NeedData {});
1273
1274        conn.receive_data(b"5\r\nhello\r\n").unwrap();
1275        assert_eq!(
1276            conn.next_event().unwrap(),
1277            Event::Data(Data {
1278                data: b"hello".to_vec(),
1279                chunk_start: true,
1280                chunk_end: true,
1281            })
1282        );
1283
1284        conn.receive_data(b"5\r\nhel").unwrap();
1285        assert_eq!(
1286            conn.next_event().unwrap(),
1287            Event::Data(Data {
1288                data: b"hel".to_vec(),
1289                chunk_start: true,
1290                chunk_end: false,
1291            })
1292        );
1293
1294        conn.receive_data(b"l").unwrap();
1295        assert_eq!(
1296            conn.next_event().unwrap(),
1297            Event::Data(Data {
1298                data: b"l".to_vec(),
1299                chunk_start: false,
1300                chunk_end: false,
1301            })
1302        );
1303
1304        conn.receive_data(b"o\r\n").unwrap();
1305        assert_eq!(
1306            conn.next_event().unwrap(),
1307            Event::Data(Data {
1308                data: b"o".to_vec(),
1309                chunk_start: false,
1310                chunk_end: true,
1311            })
1312        );
1313
1314        conn.receive_data(b"5\r\nhello").unwrap();
1315        assert_eq!(
1316            conn.next_event().unwrap(),
1317            Event::Data(Data {
1318                data: b"hello".to_vec(),
1319                chunk_start: true,
1320                chunk_end: true,
1321            })
1322        );
1323
1324        conn.receive_data(b"\r\n").unwrap();
1325        assert_eq!(conn.next_event().unwrap(), Event::NeedData {});
1326
1327        conn.receive_data(b"0\r\n\r\n").unwrap();
1328        assert_eq!(
1329            conn.next_event().unwrap(),
1330            Event::EndOfMessage(EndOfMessage {
1331                headers: vec![].into(),
1332            })
1333        );
1334    }
1335
1336    // def test_client_talking_to_http10_server() -> None:
1337    //     c = Connection(CLIENT)
1338    //     c.send(Request(method="GET", target="/", headers=[("Host", "example.com")]))
1339    //     c.send(EndOfMessage())
1340    //     assert c.our_state is DONE
1341    //     # No content-length, so Http10 framing for body
1342    //     assert receive_and_get(c, b"HTTP/1.0 200 OK\r\n\r\n") == [
1343    //         Response(status_code=200, headers=[], http_version="1.0", reason=b"OK")  # type: ignore[arg-type]
1344    //     ]
1345    //     assert c.our_state is MUST_CLOSE
1346    //     assert receive_and_get(c, b"12345") == [Data(data=b"12345")]
1347    //     assert receive_and_get(c, b"67890") == [Data(data=b"67890")]
1348    //     assert receive_and_get(c, b"") == [EndOfMessage(), ConnectionClosed()]
1349    //     assert c.their_state is CLOSED
1350
1351    #[test]
1352    fn test_client_talking_to_http10_server() {
1353        let mut c = Connection::new(Role::Client, None);
1354        c.send(
1355            Request::new(
1356                b"GET".to_vec(),
1357                vec![(b"Host".to_vec(), b"example.com".to_vec())].into(),
1358                b"/".to_vec(),
1359                b"1.1".to_vec(),
1360            )
1361            .unwrap()
1362            .into(),
1363        )
1364        .unwrap();
1365        c.send(EndOfMessage::default().into()).unwrap();
1366        assert_eq!(c.get_our_state(), State::Done);
1367        assert_eq!(
1368            receive_and_get(&mut c, b"HTTP/1.0 200 OK\r\n\r\n").unwrap(),
1369            vec![Event::NormalResponse(Response {
1370                status_code: 200,
1371                headers: vec![].into(),
1372                http_version: b"1.0".to_vec(),
1373                reason: b"OK".to_vec(),
1374            })],
1375        );
1376        assert_eq!(c.get_our_state(), State::MustClose);
1377        assert_eq!(
1378            receive_and_get(&mut c, b"12345").unwrap(),
1379            vec![Event::Data(Data {
1380                data: b"12345".to_vec(),
1381                chunk_start: false,
1382                chunk_end: false,
1383            })],
1384        );
1385        assert_eq!(
1386            receive_and_get(&mut c, b"67890").unwrap(),
1387            vec![Event::Data(Data {
1388                data: b"67890".to_vec(),
1389                chunk_start: false,
1390                chunk_end: false,
1391            })],
1392        );
1393        assert_eq!(
1394            receive_and_get(&mut c, b"").unwrap(),
1395            vec![
1396                Event::EndOfMessage(EndOfMessage::default()),
1397                Event::ConnectionClosed(ConnectionClosed::default()),
1398            ],
1399        );
1400        assert_eq!(c.get_their_state(), State::Closed);
1401    }
1402
1403    // def test_server_talking_to_http10_client() -> None:
1404    //     c = Connection(SERVER)
1405    //     # No content-length, so no body
1406    //     # NB: no host header
1407    //     assert receive_and_get(c, b"GET / HTTP/1.0\r\n\r\n") == [
1408    //         Request(method="GET", target="/", headers=[], http_version="1.0"),  # type: ignore[arg-type]
1409    //         EndOfMessage(),
1410    //     ]
1411    //     assert c.their_state is MUST_CLOSE
1412
1413    //     # We automatically Connection: close back at them
1414    //     assert (
1415    //         c.send(Response(status_code=200, headers=[]))  # type: ignore[arg-type]
1416    //         == b"HTTP/1.1 200 \r\nConnection: close\r\n\r\n"
1417    //     )
1418
1419    //     assert c.send(Data(data=b"12345")) == b"12345"
1420    //     assert c.send(EndOfMessage()) == b""
1421    //     assert c.our_state is MUST_CLOSE
1422
1423    //     # Check that it works if they do send Content-Length
1424    //     c = Connection(SERVER)
1425    //     # NB: no host header
1426    //     assert receive_and_get(c, b"POST / HTTP/1.0\r\nContent-Length: 10\r\n\r\n1") == [
1427    //         Request(
1428    //             method="POST",
1429    //             target="/",
1430    //             headers=[("Content-Length", "10")],
1431    //             http_version="1.0",
1432    //         ),
1433    //         Data(data=b"1"),
1434    //     ]
1435    //     assert receive_and_get(c, b"234567890") == [Data(data=b"234567890"), EndOfMessage()]
1436    //     assert c.their_state is MUST_CLOSE
1437    //     assert receive_and_get(c, b"") == [ConnectionClosed()]
1438
1439    #[test]
1440    fn test_server_talking_to_http10_client() {
1441        let mut c = Connection::new(Role::Server, None);
1442        // No content-length, so no body
1443        // NB: no host header
1444        assert_eq!(
1445            receive_and_get(&mut c, b"GET / HTTP/1.0\r\n\r\n").unwrap(),
1446            vec![
1447                Event::Request(Request {
1448                    method: b"GET".to_vec(),
1449                    target: b"/".to_vec(),
1450                    headers: vec![].into(),
1451                    http_version: b"1.0".to_vec(),
1452                }),
1453                Event::EndOfMessage(EndOfMessage::default()),
1454            ],
1455        );
1456        assert_eq!(c.get_their_state(), State::MustClose);
1457
1458        // We automatically Connection: close back at them
1459        assert_eq!(
1460            c.send(
1461                Response {
1462                    status_code: 200,
1463                    headers: vec![].into(),
1464                    http_version: b"1.1".to_vec(),
1465                    reason: b"".to_vec(),
1466                }
1467                .into()
1468            )
1469            .unwrap()
1470            .unwrap(),
1471            b"HTTP/1.1 200 \r\nconnection: close\r\n\r\n".to_vec()
1472        );
1473
1474        assert_eq!(
1475            c.send(
1476                Data {
1477                    data: b"12345".to_vec(),
1478                    chunk_start: false,
1479                    chunk_end: false,
1480                }
1481                .into()
1482            )
1483            .unwrap()
1484            .unwrap(),
1485            b"12345".to_vec()
1486        );
1487        assert_eq!(
1488            c.send(EndOfMessage::default().into()).unwrap().unwrap(),
1489            b"".to_vec()
1490        );
1491        assert_eq!(c.get_our_state(), State::MustClose);
1492
1493        // Check that it works if they do send Content-Length
1494        let mut c = Connection::new(Role::Server, None);
1495        // NB: no host header
1496        assert_eq!(
1497            receive_and_get(&mut c, b"POST / HTTP/1.0\r\nContent-Length: 10\r\n\r\n1").unwrap(),
1498            vec![
1499                Event::Request(Request {
1500                    method: b"POST".to_vec(),
1501                    target: b"/".to_vec(),
1502                    headers: vec![(b"Content-Length".to_vec(), b"10".to_vec())].into(),
1503                    http_version: b"1.0".to_vec(),
1504                }),
1505                Event::Data(Data {
1506                    data: b"1".to_vec(),
1507                    chunk_start: false,
1508                    chunk_end: false,
1509                }),
1510            ],
1511        );
1512        assert_eq!(
1513            receive_and_get(&mut c, b"234567890").unwrap(),
1514            vec![
1515                Event::Data(Data {
1516                    data: b"234567890".to_vec(),
1517                    chunk_start: false,
1518                    chunk_end: false,
1519                }),
1520                Event::EndOfMessage(EndOfMessage::default()),
1521            ],
1522        );
1523        assert_eq!(c.get_their_state(), State::MustClose);
1524        assert_eq!(
1525            receive_and_get(&mut c, b"").unwrap(),
1526            vec![Event::ConnectionClosed(ConnectionClosed::default())],
1527        );
1528    }
1529
1530    // def test_automatic_transfer_encoding_in_response() -> None:
1531    //     # Check that in responses, the user can specify either Transfer-Encoding:
1532    //     # chunked or no framing at all, and in both cases we automatically select
1533    //     # the right option depending on whether the peer speaks HTTP/1.0 or
1534    //     # HTTP/1.1
1535    //     for user_headers in [
1536    //         [("Transfer-Encoding", "chunked")],
1537    //         [],
1538    //         # In fact, this even works if Content-Length is set,
1539    //         # because if both are set then Transfer-Encoding wins
1540    //         [("Transfer-Encoding", "chunked"), ("Content-Length", "100")],
1541    //     ]:
1542    //         user_headers = cast(List[Tuple[str, str]], user_headers)
1543    //         p = ConnectionPair()
1544    //         p.send(
1545    //             CLIENT,
1546    //             [
1547    //                 Request(method="GET", target="/", headers=[("Host", "example.com")]),
1548    //                 EndOfMessage(),
1549    //             ],
1550    //         )
1551    //         # When speaking to HTTP/1.1 client, all of the above cases get
1552    //         # normalized to Transfer-Encoding: chunked
1553    //         p.send(
1554    //             SERVER,
1555    //             Response(status_code=200, headers=user_headers),
1556    //             expect=Response(
1557    //                 status_code=200, headers=[("Transfer-Encoding", "chunked")]
1558    //             ),
1559    //         )
1560
1561    //         # When speaking to HTTP/1.0 client, all of the above cases get
1562    //         # normalized to no-framing-headers
1563    //         c = Connection(SERVER)
1564    //         receive_and_get(c, b"GET / HTTP/1.0\r\n\r\n")
1565    //         assert (
1566    //             c.send(Response(status_code=200, headers=user_headers))
1567    //             == b"HTTP/1.1 200 \r\nConnection: close\r\n\r\n"
1568    //         )
1569    //         assert c.send(Data(data=b"12345")) == b"12345"
1570
1571    #[test]
1572    fn test_automatic_transfer_encoding_in_response() {
1573        // Check that in responses, the user can specify either Transfer-Encoding:
1574        // chunked or no framing at all, and in both cases we automatically select
1575        // the right option depending on whether the peer speaks HTTP/1.0 or
1576        // HTTP/1.1
1577        for user_headers in vec![
1578            vec![(b"Transfer-Encoding".to_vec(), b"chunked".to_vec())],
1579            vec![],
1580            // In fact, this even works if Content-Length is set,
1581            // because if both are set then Transfer-Encoding wins
1582            vec![
1583                (b"Transfer-Encoding".to_vec(), b"chunked".to_vec()),
1584                (b"Content-Length".to_vec(), b"100".to_vec()),
1585            ],
1586        ] {
1587            let mut p = ConnectionPair::new();
1588            p.send(
1589                Role::Client,
1590                vec![
1591                    Request::new(
1592                        b"GET".to_vec(),
1593                        vec![(b"Host".to_vec(), b"example.com".to_vec())].into(),
1594                        b"/".to_vec(),
1595                        b"1.1".to_vec(),
1596                    )
1597                    .unwrap()
1598                    .into(),
1599                    EndOfMessage::default().into(),
1600                ],
1601                None,
1602            )
1603            .unwrap();
1604            // When speaking to HTTP/1.1 client, all of the above cases get
1605            // normalized to Transfer-Encoding: chunked
1606            p.send(
1607                Role::Server,
1608                vec![Response {
1609                    status_code: 200,
1610                    headers: user_headers.clone().into(),
1611                    http_version: b"1.1".to_vec(),
1612                    reason: b"".to_vec(),
1613                }
1614                .into()],
1615                Some(vec![Response {
1616                    status_code: 200,
1617                    headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
1618                    http_version: b"1.1".to_vec(),
1619                    reason: b"".to_vec(),
1620                }
1621                .into()]),
1622            )
1623            .unwrap();
1624
1625            // When speaking to HTTP/1.0 client, all of the above cases get
1626            // normalized to no-framing-headers
1627            let mut c = Connection::new(Role::Server, None);
1628            receive_and_get(&mut c, b"GET / HTTP/1.0\r\n\r\n").unwrap();
1629            assert_eq!(
1630                c.send(
1631                    Response {
1632                        status_code: 200,
1633                        headers: user_headers.clone().into(),
1634                        http_version: b"1.1".to_vec(),
1635                        reason: b"".to_vec(),
1636                    }
1637                    .into()
1638                )
1639                .unwrap()
1640                .unwrap(),
1641                b"HTTP/1.1 200 \r\nconnection: close\r\n\r\n".to_vec()
1642            );
1643            assert_eq!(
1644                c.send(
1645                    Data {
1646                        data: b"12345".to_vec(),
1647                        chunk_start: false,
1648                        chunk_end: false,
1649                    }
1650                    .into()
1651                )
1652                .unwrap()
1653                .unwrap(),
1654                b"12345".to_vec()
1655            );
1656        }
1657    }
1658
1659    // def test_automagic_connection_close_handling() -> None:
1660    //     p = ConnectionPair()
1661    //     # If the user explicitly sets Connection: close, then we notice and
1662    //     # respect it
1663    //     p.send(
1664    //         CLIENT,
1665    //         [
1666    //             Request(
1667    //                 method="GET",
1668    //                 target="/",
1669    //                 headers=[("Host", "example.com"), ("Connection", "close")],
1670    //             ),
1671    //             EndOfMessage(),
1672    //         ],
1673    //     )
1674    //     for conn in p.conns:
1675    //         assert conn.states[CLIENT] is MUST_CLOSE
1676    //     # And if the client sets it, the server automatically echoes it back
1677    //     p.send(
1678    //         SERVER,
1679    //         # no header here...
1680    //         [Response(status_code=204, headers=[]), EndOfMessage()],  # type: ignore[arg-type]
1681    //         # ...but oh look, it arrived anyway
1682    //         expect=[
1683    //             Response(status_code=204, headers=[("connection", "close")]),
1684    //             EndOfMessage(),
1685    //         ],
1686    //     )
1687    //     for conn in p.conns:
1688    //         assert conn.states == {CLIENT: MUST_CLOSE, SERVER: MUST_CLOSE}
1689
1690    #[test]
1691    fn test_automagic_connection_close_handling() {
1692        let mut p = ConnectionPair::new();
1693        // If the user explicitly sets Connection: close, then we notice and
1694        // respect it
1695        p.send(
1696            Role::Client,
1697            vec![
1698                Request::new(
1699                    b"GET".to_vec(),
1700                    vec![
1701                        (b"Host".to_vec(), b"example.com".to_vec()),
1702                        (b"Connection".to_vec(), b"close".to_vec()),
1703                    ]
1704                    .into(),
1705                    b"/".to_vec(),
1706                    b"1.1".to_vec(),
1707                )
1708                .unwrap()
1709                .into(),
1710                EndOfMessage::default().into(),
1711            ],
1712            None,
1713        )
1714        .unwrap();
1715        for (_, connection) in &p.conn {
1716            assert_eq!(connection.get_states()[&Role::Client], State::MustClose);
1717        }
1718        // And if the client sets it, the server automatically echoes it back
1719        p.send(
1720            Role::Server,
1721            vec![
1722                Response {
1723                    status_code: 204,
1724                    headers: vec![].into(),
1725                    http_version: b"1.1".to_vec(),
1726                    reason: b"".to_vec(),
1727                }
1728                .into(),
1729                EndOfMessage::default().into(),
1730            ],
1731            Some(vec![
1732                Response {
1733                    status_code: 204,
1734                    headers: vec![(b"connection".to_vec(), b"close".to_vec())].into(),
1735                    http_version: b"1.1".to_vec(),
1736                    reason: b"".to_vec(),
1737                }
1738                .into(),
1739                EndOfMessage::default().into(),
1740            ]),
1741        )
1742        .unwrap();
1743        for (_, connection) in &p.conn {
1744            assert_eq!(
1745                connection.get_states(),
1746                HashMap::from([
1747                    (Role::Client, State::MustClose),
1748                    (Role::Server, State::MustClose),
1749                ])
1750            );
1751        }
1752    }
1753
1754    // def test_100_continue() -> None:
1755    //     def setup() -> ConnectionPair:
1756    //         p = ConnectionPair()
1757    //         p.send(
1758    //             CLIENT,
1759    //             Request(
1760    //                 method="GET",
1761    //                 target="/",
1762    //                 headers=[
1763    //                     ("Host", "example.com"),
1764    //                     ("Content-Length", "100"),
1765    //                     ("Expect", "100-continue"),
1766    //                 ],
1767    //             ),
1768    //         )
1769    //         for conn in p.conns:
1770    //             assert conn.get_client_is_waiting_for_100_continue()
1771    //         assert not p.conn[CLIENT].they_are_waiting_for_100_continue
1772    //         assert p.conn[SERVER].they_are_waiting_for_100_continue
1773    //         return p
1774
1775    //     # Disabled by 100 Continue
1776    //     p = setup()
1777    //     p.send(SERVER, InformationalResponse(status_code=100, headers=[]))  # type: ignore[arg-type]
1778    //     for conn in p.conns:
1779    //         assert not conn.get_client_is_waiting_for_100_continue()
1780    //         assert not conn.they_are_waiting_for_100_continue
1781
1782    //     # Disabled by a real response
1783    //     p = setup()
1784    //     p.send(
1785    //         SERVER, Response(status_code=200, headers=[("Transfer-Encoding", "chunked")])
1786    //     )
1787    //     for conn in p.conns:
1788    //         assert not conn.get_client_is_waiting_for_100_continue()
1789    //         assert not conn.they_are_waiting_for_100_continue
1790
1791    //     # Disabled by the client going ahead and sending stuff anyway
1792    //     p = setup()
1793    //     p.send(CLIENT, Data(data=b"12345"))
1794    //     for conn in p.conns:
1795    //         assert not conn.get_client_is_waiting_for_100_continue()
1796    //         assert not conn.they_are_waiting_for_100_continue
1797
1798    #[test]
1799    fn test_100_continue() {
1800        fn setup() -> ConnectionPair {
1801            let mut p = ConnectionPair::new();
1802            p.send(
1803                Role::Client,
1804                vec![Request::new(
1805                    b"GET".to_vec(),
1806                    vec![
1807                        (b"Host".to_vec(), b"example.com".to_vec()),
1808                        (b"Content-Length".to_vec(), b"100".to_vec()),
1809                        (b"Expect".to_vec(), b"100-continue".to_vec()),
1810                    ]
1811                    .into(),
1812                    b"/".to_vec(),
1813                    b"1.1".to_vec(),
1814                )
1815                .unwrap()
1816                .into()],
1817                None,
1818            )
1819            .unwrap();
1820            for (_, connection) in &p.conn {
1821                assert!(connection.get_client_is_waiting_for_100_continue());
1822            }
1823            assert!(!p.conn[&Role::Client].get_they_are_waiting_for_100_continue());
1824            assert!(p.conn[&Role::Server].get_they_are_waiting_for_100_continue());
1825            p
1826        }
1827
1828        // Disabled by 100 Continue
1829        let mut p = setup();
1830        p.send(
1831            Role::Server,
1832            vec![Response {
1833                status_code: 100,
1834                headers: vec![].into(),
1835                http_version: b"1.1".to_vec(),
1836                reason: b"".to_vec(),
1837            }
1838            .into()],
1839            None,
1840        )
1841        .unwrap();
1842        for (_, connection) in &p.conn {
1843            assert!(!connection.get_client_is_waiting_for_100_continue());
1844            assert!(!connection.get_they_are_waiting_for_100_continue());
1845        }
1846
1847        // Disabled by a real response
1848        let mut p = setup();
1849        p.send(
1850            Role::Server,
1851            vec![Response {
1852                status_code: 200,
1853                headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
1854                http_version: b"1.1".to_vec(),
1855                reason: b"".to_vec(),
1856            }
1857            .into()],
1858            None,
1859        )
1860        .unwrap();
1861        for (_, connection) in &p.conn {
1862            assert!(!connection.get_client_is_waiting_for_100_continue());
1863            assert!(!connection.get_they_are_waiting_for_100_continue());
1864        }
1865
1866        // Disabled by the client going ahead and sending stuff anyway
1867        let mut p = setup();
1868        p.send(
1869            Role::Client,
1870            vec![Data {
1871                data: b"12345".to_vec(),
1872                chunk_start: false,
1873                chunk_end: false,
1874            }
1875            .into()],
1876            None,
1877        )
1878        .unwrap();
1879        for (_, connection) in &p.conn {
1880            assert!(!connection.get_client_is_waiting_for_100_continue());
1881            assert!(!connection.get_they_are_waiting_for_100_continue());
1882        }
1883
1884        // Disabled by the client going ahead and sending stuff anyway
1885        let mut p = setup();
1886        p.send(
1887            Role::Client,
1888            vec![Data {
1889                data: b"12345".to_vec(),
1890                chunk_start: false,
1891                chunk_end: false,
1892            }
1893            .into()],
1894            None,
1895        )
1896        .unwrap();
1897        for (_, connection) in &p.conn {
1898            assert!(!connection.get_client_is_waiting_for_100_continue());
1899            assert!(!connection.get_they_are_waiting_for_100_continue());
1900        }
1901
1902        // Disabled by the client going ahead and sending stuff anyway
1903        let mut p = setup();
1904        p.send(
1905            Role::Client,
1906            vec![Data {
1907                data: b"12345".to_vec(),
1908                chunk_start: false,
1909                chunk_end: false,
1910            }
1911            .into()],
1912            None,
1913        )
1914        .unwrap();
1915        for (_, connection) in &p.conn {
1916            assert!(!connection.get_client_is_waiting_for_100_continue());
1917            assert!(!connection.get_they_are_waiting_for_100_continue());
1918        }
1919    }
1920
1921    // def test_max_incomplete_event_size_countermeasure() -> None:
1922    //     # Infinitely long headers are definitely not okay
1923    //     c = Connection(SERVER)
1924    //     c.receive_data(b"GET / HTTP/1.0\r\nEndless: ")
1925    //     assert c.next_event() is NEED_DATA
1926    //     with pytest.raises(RemoteProtocolError):
1927    //         while True:
1928    //             c.receive_data(b"a" * 1024)
1929    //             c.next_event()
1930
1931    //     # Checking that the same header is accepted / rejected depending on the
1932    //     # max_incomplete_event_size setting:
1933    //     c = Connection(SERVER, max_incomplete_event_size=5000)
1934    //     c.receive_data(b"GET / HTTP/1.0\r\nBig: ")
1935    //     c.receive_data(b"a" * 4000)
1936    //     c.receive_data(b"\r\n\r\n")
1937    //     assert get_all_events(c) == [
1938    //         Request(
1939    //             method="GET", target="/", http_version="1.0", headers=[("big", "a" * 4000)]
1940    //         ),
1941    //         EndOfMessage(),
1942    //     ]
1943
1944    //     c = Connection(SERVER, max_incomplete_event_size=4000)
1945    //     c.receive_data(b"GET / HTTP/1.0\r\nBig: ")
1946    //     c.receive_data(b"a" * 4000)
1947    //     with pytest.raises(RemoteProtocolError):
1948    //         c.next_event()
1949
1950    //     # Temporarily exceeding the size limit is fine, as long as its done with
1951    //     # complete events:
1952    //     c = Connection(SERVER, max_incomplete_event_size=5000)
1953    //     c.receive_data(b"GET / HTTP/1.0\r\nContent-Length: 10000")
1954    //     c.receive_data(b"\r\n\r\n" + b"a" * 10000)
1955    //     assert get_all_events(c) == [
1956    //         Request(
1957    //             method="GET",
1958    //             target="/",
1959    //             http_version="1.0",
1960    //             headers=[("Content-Length", "10000")],
1961    //         ),
1962    //         Data(data=b"a" * 10000),
1963    //         EndOfMessage(),
1964    //     ]
1965
1966    //     c = Connection(SERVER, max_incomplete_event_size=100)
1967    //     # Two pipelined requests to create a way-too-big receive buffer... but
1968    //     # it's fine because we're not checking
1969    //     c.receive_data(
1970    //         b"GET /1 HTTP/1.1\r\nHost: a\r\n\r\n"
1971    //         b"GET /2 HTTP/1.1\r\nHost: b\r\n\r\n" + b"X" * 1000
1972    //     )
1973    //     assert get_all_events(c) == [
1974    //         Request(method="GET", target="/1", headers=[("host", "a")]),
1975    //         EndOfMessage(),
1976    //     ]
1977    //     # Even more data comes in, still no problem
1978    //     c.receive_data(b"X" * 1000)
1979    //     # We can respond and reuse to get the second pipelined request
1980    //     c.send(Response(status_code=200, headers=[]))  # type: ignore[arg-type]
1981    //     c.send(EndOfMessage())
1982    //     c.start_next_cycle()
1983    //     assert get_all_events(c) == [
1984    //         Request(method="GET", target="/2", headers=[("host", "b")]),
1985    //         EndOfMessage(),
1986    //     ]
1987    //     # But once we unpause and try to read the next message, and find that it's
1988    //     # incomplete and the buffer is *still* way too large, then *that's* a
1989    //     # problem:
1990    //     c.send(Response(status_code=200, headers=[]))  # type: ignore[arg-type]
1991    //     c.send(EndOfMessage())
1992    //     c.start_next_cycle()
1993    //     with pytest.raises(RemoteProtocolError):
1994    //         c.next_event()
1995
1996    #[test]
1997    fn test_max_incomplete_event_size_countermeasure() {
1998        // Infinitely long headers are definitely not okay
1999        let mut c = Connection::new(Role::Server, Some(5000));
2000        c.receive_data(b"GET / HTTP/1.0\r\nEndless: ").unwrap();
2001        assert_eq!(c.next_event().unwrap(), Event::NeedData {});
2002
2003        // Checking that the same header is accepted / rejected depending on the
2004        // max_incomplete_event_size setting:
2005        let mut c = Connection::new(Role::Server, Some(5000));
2006        c.receive_data(b"GET / HTTP/1.0\r\nBig: ").unwrap();
2007        c.receive_data(&vec![b'a'; 4000]).unwrap();
2008        c.receive_data(b"\r\n\r\n").unwrap();
2009        assert_eq!(
2010            get_all_events(&mut c).unwrap(),
2011            vec![
2012                Event::Request(Request {
2013                    method: b"GET".to_vec(),
2014                    target: b"/".to_vec(),
2015                    headers: vec![(b"Big".to_vec(), vec![b'a'; 4000])].into(),
2016                    http_version: b"1.0".to_vec(),
2017                }),
2018                Event::EndOfMessage(EndOfMessage::default()),
2019            ],
2020        );
2021
2022        let mut c = Connection::new(Role::Server, Some(4000));
2023        c.receive_data(b"GET / HTTP/1.0\r\nBig: ").unwrap();
2024        c.receive_data(&vec![b'a'; 4000]).unwrap();
2025        assert!(match c.next_event().unwrap_err() {
2026            ProtocolError::RemoteProtocolError(_) => true,
2027            _ => false,
2028        });
2029
2030        // Temporarily exceeding the size limit is fine, as long as its done with
2031        // complete events:
2032        let mut c = Connection::new(Role::Server, Some(5000));
2033        c.receive_data(b"GET / HTTP/1.0\r\nContent-Length: 10000")
2034            .unwrap();
2035        c.receive_data(b"\r\n\r\n").unwrap();
2036        c.receive_data(&vec![b'a'; 10000]).unwrap();
2037        assert_eq!(
2038            get_all_events(&mut c).unwrap(),
2039            vec![
2040                Event::Request(Request {
2041                    method: b"GET".to_vec(),
2042                    target: b"/".to_vec(),
2043                    headers: vec![(b"Content-Length".to_vec(), b"10000".to_vec())].into(),
2044                    http_version: b"1.0".to_vec(),
2045                }),
2046                Event::Data(Data {
2047                    data: vec![b'a'; 10000],
2048                    chunk_start: false,
2049                    chunk_end: false,
2050                }),
2051                Event::EndOfMessage(EndOfMessage::default()),
2052            ],
2053        );
2054
2055        let mut c = Connection::new(Role::Server, Some(100));
2056        // Two pipelined requests to create a way-too-big receive buffer... but
2057        // it's fine because we're not checking
2058        c.receive_data(
2059            b"GET /1 HTTP/1.1\r\nHost: a\r\n\r\n"
2060                .to_vec()
2061                .into_iter()
2062                .chain(b"GET /2 HTTP/1.1\r\nHost: b\r\n\r\n".to_vec().into_iter())
2063                .chain(vec![b'X'; 1000].into_iter())
2064                .collect::<Vec<u8>>()
2065                .as_slice(),
2066        )
2067        .unwrap();
2068        assert_eq!(
2069            get_all_events(&mut c).unwrap(),
2070            vec![
2071                Event::Request(Request {
2072                    method: b"GET".to_vec(),
2073                    target: b"/1".to_vec(),
2074                    headers: vec![(b"Host".to_vec(), b"a".to_vec())].into(),
2075                    http_version: b"1.1".to_vec(),
2076                }),
2077                Event::EndOfMessage(EndOfMessage::default()),
2078            ],
2079        );
2080        // Even more data comes in, still no problem
2081        c.receive_data(&vec![b'X'; 1000]).unwrap();
2082        // We can respond and reuse to get the second pipelined request
2083        c.send(
2084            Response {
2085                status_code: 200,
2086                headers: vec![].into(),
2087                http_version: b"1.1".to_vec(),
2088                reason: b"".to_vec(),
2089            }
2090            .into(),
2091        )
2092        .unwrap();
2093        c.send(EndOfMessage::default().into()).unwrap();
2094        c.start_next_cycle().unwrap();
2095        assert_eq!(
2096            get_all_events(&mut c).unwrap(),
2097            vec![
2098                Event::Request(Request {
2099                    method: b"GET".to_vec(),
2100                    target: b"/2".to_vec(),
2101                    headers: vec![(b"Host".to_vec(), b"b".to_vec())].into(),
2102                    http_version: b"1.1".to_vec(),
2103                }),
2104                Event::EndOfMessage(EndOfMessage::default()),
2105            ],
2106        );
2107        // But once we unpause and try to read the next message, and find that it's
2108        // incomplete and the buffer is *still* way too large, then *that's* a
2109        // problem:
2110        c.send(
2111            Response {
2112                status_code: 200,
2113                headers: vec![].into(),
2114                http_version: b"1.1".to_vec(),
2115                reason: b"".to_vec(),
2116            }
2117            .into(),
2118        )
2119        .unwrap();
2120        c.send(EndOfMessage::default().into()).unwrap();
2121        c.start_next_cycle().unwrap();
2122        assert!(match c.next_event().unwrap_err() {
2123            ProtocolError::RemoteProtocolError(_) => true,
2124            _ => false,
2125        });
2126
2127        // Check that we can still send data after this happens
2128        let mut c = Connection::new(Role::Server, Some(100));
2129        // Two pipelined requests to create a way-too-big receive buffer... but
2130        // it's fine because we're not checking
2131        c.receive_data(
2132            b"GET /1 HTTP/1.1\r\nHost: a\r\n\r\n"
2133                .to_vec()
2134                .into_iter()
2135                .chain(b"GET /2 HTTP/1.1\r\nHost: b\r\n\r\n".to_vec().into_iter())
2136                .chain(vec![b'X'; 1000].into_iter())
2137                .collect::<Vec<u8>>()
2138                .as_slice(),
2139        )
2140        .unwrap();
2141        assert_eq!(
2142            get_all_events(&mut c).unwrap(),
2143            vec![
2144                Event::Request(Request {
2145                    method: b"GET".to_vec(),
2146                    target: b"/1".to_vec(),
2147                    headers: vec![(b"Host".to_vec(), b"a".to_vec())].into(),
2148                    http_version: b"1.1".to_vec(),
2149                }),
2150                Event::EndOfMessage(EndOfMessage::default()),
2151            ],
2152        );
2153        // Even more data comes in, still no problem
2154        c.receive_data(&vec![b'X'; 1000]).unwrap();
2155        // We can respond and reuse to get the second pipelined request
2156        c.send(
2157            Response {
2158                status_code: 200,
2159                headers: vec![].into(),
2160                http_version: b"1.1".to_vec(),
2161                reason: b"".to_vec(),
2162            }
2163            .into(),
2164        )
2165        .unwrap();
2166        c.send(EndOfMessage::default().into()).unwrap();
2167        c.start_next_cycle().unwrap();
2168        assert_eq!(
2169            get_all_events(&mut c).unwrap(),
2170            vec![
2171                Event::Request(Request {
2172                    method: b"GET".to_vec(),
2173                    target: b"/2".to_vec(),
2174                    headers: vec![(b"Host".to_vec(), b"b".to_vec())].into(),
2175                    http_version: b"1.1".to_vec(),
2176                }),
2177                Event::EndOfMessage(EndOfMessage::default()),
2178            ],
2179        );
2180        // But once we unpause and try to read the next message, and find that it's
2181        // incomplete and the buffer is *still* way too large, then *that's* a
2182        // problem:
2183        c.send(
2184            Response {
2185                status_code: 200,
2186                headers: vec![].into(),
2187                http_version: b"1.1".to_vec(),
2188                reason: b"".to_vec(),
2189            }
2190            .into(),
2191        )
2192        .unwrap();
2193        c.send(EndOfMessage::default().into()).unwrap();
2194        c.start_next_cycle().unwrap();
2195        assert!(match c.next_event().unwrap_err() {
2196            ProtocolError::RemoteProtocolError(_) => true,
2197            _ => false,
2198        });
2199    }
2200
2201    // def test_reuse_simple() -> None:
2202    //     p = ConnectionPair()
2203    //     p.send(
2204    //         CLIENT,
2205    //         [Request(method="GET", target="/", headers=[("Host", "a")]), EndOfMessage()],
2206    //     )
2207    //     p.send(
2208    //         SERVER,
2209    //         [
2210    //             Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
2211    //             EndOfMessage(),
2212    //         ],
2213    //     )
2214    //     for conn in p.conns:
2215    //         assert conn.states == {CLIENT: DONE, SERVER: DONE}
2216    //         conn.start_next_cycle()
2217
2218    //     p.send(
2219    //         CLIENT,
2220    //         [
2221    //             Request(method="DELETE", target="/foo", headers=[("Host", "a")]),
2222    //             EndOfMessage(),
2223    //         ],
2224    //     )
2225    //     p.send(
2226    //         SERVER,
2227    //         [
2228    //             Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]),
2229    //             EndOfMessage(),
2230    //         ],
2231    //     )
2232
2233    #[test]
2234    fn test_reuse_simple() {
2235        let mut p = ConnectionPair::new();
2236        p.send(
2237            Role::Client,
2238            vec![
2239                Request::new(
2240                    b"GET".to_vec(),
2241                    vec![(b"Host".to_vec(), b"a".to_vec())].into(),
2242                    b"/".to_vec(),
2243                    b"1.1".to_vec(),
2244                )
2245                .unwrap()
2246                .into(),
2247                EndOfMessage::default().into(),
2248            ],
2249            None,
2250        )
2251        .unwrap();
2252        p.send(
2253            Role::Server,
2254            vec![
2255                Response {
2256                    status_code: 200,
2257                    headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
2258                    http_version: b"1.1".to_vec(),
2259                    reason: b"".to_vec(),
2260                }
2261                .into(),
2262                EndOfMessage::default().into(),
2263            ],
2264            None,
2265        )
2266        .unwrap();
2267        for (_, connection) in &mut p.conn {
2268            assert_eq!(
2269                connection.get_states(),
2270                HashMap::from([(Role::Client, State::Done), (Role::Server, State::Done),])
2271            );
2272            connection.start_next_cycle().unwrap();
2273        }
2274
2275        p.send(
2276            Role::Client,
2277            vec![
2278                Request::new(
2279                    b"DELETE".to_vec(),
2280                    vec![(b"Host".to_vec(), b"a".to_vec())].into(),
2281                    b"/foo".to_vec(),
2282                    b"1.1".to_vec(),
2283                )
2284                .unwrap()
2285                .into(),
2286                EndOfMessage::default().into(),
2287            ],
2288            None,
2289        )
2290        .unwrap();
2291        p.send(
2292            Role::Server,
2293            vec![
2294                Response {
2295                    status_code: 404,
2296                    headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
2297                    http_version: b"1.1".to_vec(),
2298                    reason: b"".to_vec(),
2299                }
2300                .into(),
2301                EndOfMessage::default().into(),
2302            ],
2303            None,
2304        )
2305        .unwrap();
2306    }
2307
2308    // def test_pipelining() -> None:
2309    //     # Client doesn't support pipelining, so we have to do this by hand
2310    //     c = Connection(SERVER)
2311    //     assert c.next_event() is NEED_DATA
2312    //     # 3 requests all bunched up
2313    //     c.receive_data(
2314    //         b"GET /1 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n"
2315    //         b"12345"
2316    //         b"GET /2 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n"
2317    //         b"67890"
2318    //         b"GET /3 HTTP/1.1\r\nHost: a.com\r\n\r\n"
2319    //     )
2320    //     assert get_all_events(c) == [
2321    //         Request(
2322    //             method="GET",
2323    //             target="/1",
2324    //             headers=[("Host", "a.com"), ("Content-Length", "5")],
2325    //         ),
2326    //         Data(data=b"12345"),
2327    //         EndOfMessage(),
2328    //     ]
2329    //     assert c.their_state is DONE
2330    //     assert c.our_state is SEND_RESPONSE
2331
2332    //     assert c.next_event() is PAUSED
2333
2334    //     c.send(Response(status_code=200, headers=[]))  # type: ignore[arg-type]
2335    //     c.send(EndOfMessage())
2336    //     assert c.their_state is DONE
2337    //     assert c.our_state is DONE
2338
2339    //     c.start_next_cycle()
2340
2341    //     assert get_all_events(c) == [
2342    //         Request(
2343    //             method="GET",
2344    //             target="/2",
2345    //             headers=[("Host", "a.com"), ("Content-Length", "5")],
2346    //         ),
2347    //         Data(data=b"67890"),
2348    //         EndOfMessage(),
2349    //     ]
2350    //     assert c.next_event() is PAUSED
2351    //     c.send(Response(status_code=200, headers=[]))  # type: ignore[arg-type]
2352    //     c.send(EndOfMessage())
2353    //     c.start_next_cycle()
2354
2355    //     assert get_all_events(c) == [
2356    //         Request(method="GET", target="/3", headers=[("Host", "a.com")]),
2357    //         EndOfMessage(),
2358    //     ]
2359    //     # Doesn't pause this time, no trailing data
2360    //     assert c.next_event() is NEED_DATA
2361    //     c.send(Response(status_code=200, headers=[]))  # type: ignore[arg-type]
2362    //     c.send(EndOfMessage())
2363
2364    //     # Arrival of more data triggers pause
2365    //     assert c.next_event() is NEED_DATA
2366    //     c.receive_data(b"SADF")
2367    //     assert c.next_event() is PAUSED
2368    //     assert c.trailing_data == (b"SADF", False)
2369    //     # If EOF arrives while paused, we don't see that either:
2370    //     c.receive_data(b"")
2371    //     assert c.trailing_data == (b"SADF", True)
2372    //     assert c.next_event() is PAUSED
2373    //     c.receive_data(b"")
2374    //     assert c.next_event() is PAUSED
2375
2376    #[test]
2377    fn test_pipelining() {
2378        // Client doesn't support pipelining, so we have to do this by hand
2379        let mut c = Connection::new(Role::Server, None);
2380        assert_eq!(c.next_event().unwrap(), Event::NeedData {});
2381
2382        // 3 requests all bunched up
2383        c.receive_data(
2384            &vec![
2385                b"GET /1 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n".to_vec(),
2386                b"12345".to_vec(),
2387                b"GET /2 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n".to_vec(),
2388                b"67890".to_vec(),
2389                b"GET /3 HTTP/1.1\r\nHost: a.com\r\n\r\n".to_vec(),
2390            ]
2391            .into_iter()
2392            .flatten()
2393            .collect::<Vec<u8>>(),
2394        )
2395        .unwrap();
2396        assert_eq!(
2397            get_all_events(&mut c).unwrap(),
2398            vec![
2399                Event::Request(Request {
2400                    method: b"GET".to_vec(),
2401                    target: b"/1".to_vec(),
2402                    headers: vec![
2403                        (b"Host".to_vec(), b"a.com".to_vec()),
2404                        (b"Content-Length".to_vec(), b"5".to_vec())
2405                    ]
2406                    .into(),
2407                    http_version: b"1.1".to_vec(),
2408                }),
2409                Event::Data(Data {
2410                    data: b"12345".to_vec(),
2411                    chunk_start: false,
2412                    chunk_end: false,
2413                }),
2414                Event::EndOfMessage(EndOfMessage::default()),
2415            ],
2416        );
2417        assert_eq!(c.get_their_state(), State::Done);
2418        assert_eq!(c.get_our_state(), State::SendResponse);
2419
2420        assert_eq!(c.next_event().unwrap(), Event::Paused {});
2421
2422        c.send(
2423            Response {
2424                status_code: 200,
2425                headers: vec![].into(),
2426                http_version: b"1.1".to_vec(),
2427                reason: b"".to_vec(),
2428            }
2429            .into(),
2430        )
2431        .unwrap();
2432        c.send(EndOfMessage::default().into()).unwrap();
2433        assert_eq!(c.get_their_state(), State::Done);
2434        assert_eq!(c.get_our_state(), State::Done);
2435
2436        c.start_next_cycle().unwrap();
2437
2438        assert_eq!(
2439            get_all_events(&mut c).unwrap(),
2440            vec![
2441                Event::Request(Request {
2442                    method: b"GET".to_vec(),
2443                    target: b"/2".to_vec(),
2444                    headers: vec![
2445                        (b"Host".to_vec(), b"a.com".to_vec()),
2446                        (b"Content-Length".to_vec(), b"5".to_vec())
2447                    ]
2448                    .into(),
2449                    http_version: b"1.1".to_vec(),
2450                }),
2451                Event::Data(Data {
2452                    data: b"67890".to_vec(),
2453                    chunk_start: false,
2454                    chunk_end: false,
2455                }),
2456                Event::EndOfMessage(EndOfMessage::default()),
2457            ],
2458        );
2459        assert_eq!(c.next_event().unwrap(), Event::Paused {});
2460        c.send(
2461            Response {
2462                status_code: 200,
2463                headers: vec![].into(),
2464                http_version: b"1.1".to_vec(),
2465                reason: b"".to_vec(),
2466            }
2467            .into(),
2468        )
2469        .unwrap();
2470        c.send(EndOfMessage::default().into()).unwrap();
2471        c.start_next_cycle().unwrap();
2472
2473        assert_eq!(
2474            get_all_events(&mut c).unwrap(),
2475            vec![
2476                Event::Request(Request {
2477                    method: b"GET".to_vec(),
2478                    target: b"/3".to_vec(),
2479                    headers: vec![(b"Host".to_vec(), b"a.com".to_vec())].into(),
2480                    http_version: b"1.1".to_vec(),
2481                }),
2482                Event::EndOfMessage(EndOfMessage::default()),
2483            ],
2484        );
2485        // Doesn't pause this time, no trailing data
2486        assert_eq!(c.next_event().unwrap(), Event::NeedData {});
2487        c.send(
2488            Response {
2489                status_code: 200,
2490                headers: vec![].into(),
2491                http_version: b"1.1".to_vec(),
2492                reason: b"".to_vec(),
2493            }
2494            .into(),
2495        )
2496        .unwrap();
2497        c.send(EndOfMessage::default().into()).unwrap();
2498
2499        // Arrival of more data triggers pause
2500        assert_eq!(c.next_event().unwrap(), Event::NeedData {});
2501        c.receive_data(b"SADF").unwrap();
2502        assert_eq!(c.next_event().unwrap(), Event::Paused {});
2503        assert_eq!(c.get_trailing_data(), (b"SADF".to_vec(), false));
2504        // If EOF arrives while paused, we don't see that either:
2505        c.receive_data(b"").unwrap();
2506        assert_eq!(c.get_trailing_data(), (b"SADF".to_vec(), true));
2507        assert_eq!(c.next_event().unwrap(), Event::Paused {});
2508        c.receive_data(b"").unwrap();
2509        assert_eq!(c.next_event().unwrap(), Event::Paused {});
2510    }
2511
2512    // def test_protocol_switch() -> None:
2513    //     for req, deny, accept in [
2514    //         (
2515    //             Request(
2516    //                 method="CONNECT",
2517    //                 target="example.com:443",
2518    //                 headers=[("Host", "foo"), ("Content-Length", "1")],
2519    //             ),
2520    //             Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]),
2521    //             Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
2522    //         ),
2523    //         (
2524    //             Request(
2525    //                 method="GET",
2526    //                 target="/",
2527    //                 headers=[("Host", "foo"), ("Content-Length", "1"), ("Upgrade", "a, b")],
2528    //             ),
2529    //             Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
2530    //             InformationalResponse(status_code=101, headers=[("Upgrade", "a")]),
2531    //         ),
2532    //         (
2533    //             Request(
2534    //                 method="CONNECT",
2535    //                 target="example.com:443",
2536    //                 headers=[("Host", "foo"), ("Content-Length", "1"), ("Upgrade", "a, b")],
2537    //             ),
2538    //             Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]),
2539    //             # Accept CONNECT, not upgrade
2540    //             Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
2541    //         ),
2542    //         (
2543    //             Request(
2544    //                 method="CONNECT",
2545    //                 target="example.com:443",
2546    //                 headers=[("Host", "foo"), ("Content-Length", "1"), ("Upgrade", "a, b")],
2547    //             ),
2548    //             Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]),
2549    //             # Accept Upgrade, not CONNECT
2550    //             InformationalResponse(status_code=101, headers=[("Upgrade", "b")]),
2551    //         ),
2552    //     ]:
2553
2554    //         def setup() -> ConnectionPair:
2555    //             p = ConnectionPair()
2556    //             p.send(CLIENT, req)
2557    //             # No switch-related state change stuff yet; the client has to
2558    //             # finish the request before that kicks in
2559    //             for conn in p.conns:
2560    //                 assert conn.states[CLIENT] is SEND_BODY
2561    //             p.send(CLIENT, [Data(data=b"1"), EndOfMessage()])
2562    //             for conn in p.conns:
2563    //                 assert conn.states[CLIENT] is MIGHT_SWITCH_PROTOCOL
2564    //             assert p.conn[SERVER].next_event() is PAUSED
2565    //             return p
2566
2567    //         # Test deny case
2568    //         p = setup()
2569    //         p.send(SERVER, deny)
2570    //         for conn in p.conns:
2571    //             assert conn.states == {CLIENT: DONE, SERVER: SEND_BODY}
2572    //         p.send(SERVER, EndOfMessage())
2573    //         # Check that re-use is still allowed after a denial
2574    //         for conn in p.conns:
2575    //             conn.start_next_cycle()
2576
2577    //         # Test accept case
2578    //         p = setup()
2579    //         p.send(SERVER, accept)
2580    //         for conn in p.conns:
2581    //             assert conn.states == {CLIENT: SWITCHED_PROTOCOL, SERVER: SWITCHED_PROTOCOL}
2582    //             conn.receive_data(b"123")
2583    //             assert conn.next_event() is PAUSED
2584    //             conn.receive_data(b"456")
2585    //             assert conn.next_event() is PAUSED
2586    //             assert conn.trailing_data == (b"123456", False)
2587
2588    //         # Pausing in might-switch, then recovery
2589    //         # (weird artificial case where the trailing data actually is valid
2590    //         # HTTP for some reason, because this makes it easier to test the state
2591    //         # logic)
2592    //         p = setup()
2593    //         sc = p.conn[SERVER]
2594    //         sc.receive_data(b"GET / HTTP/1.0\r\n\r\n")
2595    //         assert sc.next_event() is PAUSED
2596    //         assert sc.trailing_data == (b"GET / HTTP/1.0\r\n\r\n", False)
2597    //         sc.send(deny)
2598    //         assert sc.next_event() is PAUSED
2599    //         sc.send(EndOfMessage())
2600    //         sc.start_next_cycle()
2601    //         assert get_all_events(sc) == [
2602    //             Request(method="GET", target="/", headers=[], http_version="1.0"),  # type: ignore[arg-type]
2603    //             EndOfMessage(),
2604    //         ]
2605
2606    //         # When we're DONE, have no trailing data, and the connection gets
2607    //         # closed, we report ConnectionClosed(). When we're in might-switch or
2608    //         # switched, we don't.
2609    //         p = setup()
2610    //         sc = p.conn[SERVER]
2611    //         sc.receive_data(b"")
2612    //         assert sc.next_event() is PAUSED
2613    //         assert sc.trailing_data == (b"", True)
2614    //         p.send(SERVER, accept)
2615    //         assert sc.next_event() is PAUSED
2616
2617    //         p = setup()
2618    //         sc = p.conn[SERVER]
2619    //         sc.receive_data(b"")
2620    //         assert sc.next_event() is PAUSED
2621    //         sc.send(deny)
2622    //         assert sc.next_event() == ConnectionClosed()
2623
2624    //         # You can't send after switching protocols, or while waiting for a
2625    //         # protocol switch
2626    //         p = setup()
2627    //         with pytest.raises(LocalProtocolError):
2628    //             p.conn[CLIENT].send(
2629    //                 Request(method="GET", target="/", headers=[("Host", "a")])
2630    //             )
2631    //         p = setup()
2632    //         p.send(SERVER, accept)
2633    //         with pytest.raises(LocalProtocolError):
2634    //             p.conn[SERVER].send(Data(data=b"123"))
2635
2636    #[test]
2637    fn test_protocol_switch() {
2638        for (req, deny, accept) in vec![
2639            (
2640                Request::new(
2641                    b"CONNECT".to_vec(),
2642                    vec![
2643                        (b"Host".to_vec(), b"foo".to_vec()),
2644                        (b"Content-Length".to_vec(), b"1".to_vec()),
2645                    ]
2646                    .into(),
2647                    b"example.com:443".to_vec(),
2648                    b"1.1".to_vec(),
2649                )
2650                .unwrap()
2651                .into(),
2652                Response {
2653                    status_code: 404,
2654                    headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
2655                    http_version: b"1.1".to_vec(),
2656                    reason: b"".to_vec(),
2657                }
2658                .into(),
2659                Response {
2660                    status_code: 200,
2661                    headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
2662                    http_version: b"1.1".to_vec(),
2663                    reason: b"".to_vec(),
2664                }
2665                .into(),
2666            ),
2667            (
2668                Request::new(
2669                    b"GET".to_vec(),
2670                    vec![
2671                        (b"Host".to_vec(), b"foo".to_vec()),
2672                        (b"Content-Length".to_vec(), b"1".to_vec()),
2673                        (b"Upgrade".to_vec(), b"a, b".to_vec()),
2674                    ]
2675                    .into(),
2676                    b"/".to_vec(),
2677                    b"1.1".to_vec(),
2678                )
2679                .unwrap()
2680                .into(),
2681                Response {
2682                    status_code: 200,
2683                    headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
2684                    http_version: b"1.1".to_vec(),
2685                    reason: b"".to_vec(),
2686                }
2687                .into(),
2688                Response {
2689                    status_code: 101,
2690                    headers: vec![(b"Upgrade".to_vec(), b"a".to_vec())].into(),
2691                    http_version: b"1.1".to_vec(),
2692                    reason: b"".to_vec(),
2693                }
2694                .into(),
2695            ),
2696            (
2697                Request::new(
2698                    b"CONNECT".to_vec(),
2699                    vec![
2700                        (b"Host".to_vec(), b"foo".to_vec()),
2701                        (b"Content-Length".to_vec(), b"1".to_vec()),
2702                        (b"Upgrade".to_vec(), b"a, b".to_vec()),
2703                    ]
2704                    .into(),
2705                    b"example.com:443".to_vec(),
2706                    b"1.1".to_vec(),
2707                )
2708                .unwrap()
2709                .into(),
2710                Response {
2711                    status_code: 404,
2712                    headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
2713                    http_version: b"1.1".to_vec(),
2714                    reason: b"".to_vec(),
2715                }
2716                .into(),
2717                // Accept CONNECT, not upgrade
2718                Response {
2719                    status_code: 200,
2720                    headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
2721                    http_version: b"1.1".to_vec(),
2722                    reason: b"".to_vec(),
2723                }
2724                .into(),
2725            ),
2726            (
2727                Request::new(
2728                    b"CONNECT".to_vec(),
2729                    vec![
2730                        (b"Host".to_vec(), b"foo".to_vec()),
2731                        (b"Content-Length".to_vec(), b"1".to_vec()),
2732                        (b"Upgrade".to_vec(), b"a, b".to_vec()),
2733                    ]
2734                    .into(),
2735                    b"example.com:443".to_vec(),
2736                    b"1.1".to_vec(),
2737                )
2738                .unwrap()
2739                .into(),
2740                Response {
2741                    status_code: 404,
2742                    headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
2743                    http_version: b"1.1".to_vec(),
2744                    reason: b"".to_vec(),
2745                }
2746                .into(),
2747                // Accept Upgrade, not CONNECT
2748                Response {
2749                    status_code: 101,
2750                    headers: vec![(b"Upgrade".to_vec(), b"b".to_vec())].into(),
2751                    http_version: b"1.1".to_vec(),
2752                    reason: b"".to_vec(),
2753                }
2754                .into(),
2755            ),
2756        ] {
2757            let req: Event = req;
2758            let deny: Event = deny;
2759            let accept: Event = accept;
2760            let setup = || {
2761                let mut p = ConnectionPair::new();
2762                p.send(Role::Client, vec![req.clone()], None).unwrap();
2763                // No switch-related state change stuff yet; the client has to
2764                // finish the request before that kicks in
2765                for (_, connection) in &mut p.conn {
2766                    assert_eq!(connection.get_states()[&Role::Client], State::SendBody);
2767                }
2768                p.send(
2769                    Role::Client,
2770                    vec![
2771                        Data {
2772                            data: b"1".to_vec(),
2773                            chunk_start: false,
2774                            chunk_end: false,
2775                        }
2776                        .into(),
2777                        EndOfMessage::default().into(),
2778                    ],
2779                    None,
2780                )
2781                .unwrap();
2782                for (_, connection) in &mut p.conn {
2783                    assert_eq!(
2784                        connection.get_states()[&Role::Client],
2785                        State::MightSwitchProtocol
2786                    );
2787                }
2788                assert_eq!(
2789                    p.conn.get_mut(&Role::Server).unwrap().next_event().unwrap(),
2790                    Event::Paused {}
2791                );
2792                return p;
2793            };
2794
2795            // Test deny case
2796            let mut p = setup();
2797            p.send(Role::Server, vec![deny.clone()], None).unwrap();
2798            for (_, connection) in &mut p.conn {
2799                assert_eq!(
2800                    connection.get_states(),
2801                    HashMap::from([(Role::Client, State::Done), (Role::Server, State::SendBody)])
2802                );
2803            }
2804            p.send(Role::Server, vec![EndOfMessage::default().into()], None)
2805                .unwrap();
2806            // Check that re-use is still allowed after a denial
2807            for (_, connection) in &mut p.conn {
2808                connection.start_next_cycle().unwrap();
2809            }
2810
2811            // Test accept case
2812            let mut p = setup();
2813            p.send(Role::Server, vec![accept.clone()], None).unwrap();
2814            for (_, connection) in &mut p.conn {
2815                assert_eq!(
2816                    connection.get_states(),
2817                    HashMap::from([
2818                        (Role::Client, State::SwitchedProtocol),
2819                        (Role::Server, State::SwitchedProtocol)
2820                    ])
2821                );
2822                connection.receive_data(b"123").unwrap();
2823                assert_eq!(connection.next_event().unwrap(), Event::Paused {});
2824                connection.receive_data(b"456").unwrap();
2825                assert_eq!(connection.next_event().unwrap(), Event::Paused {});
2826                assert_eq!(connection.get_trailing_data(), (b"123456".to_vec(), false));
2827            }
2828
2829            // Pausing in might-switch, then recovery
2830            // (weird artificial case where the trailing data actually is valid
2831            // HTTP for some reason, because this makes it easier to test the state
2832            // logic)
2833            let mut p = setup();
2834            let sc = p.conn.get_mut(&Role::Server).unwrap();
2835            sc.receive_data(b"GET / HTTP/1.0\r\n\r\n").unwrap();
2836            assert_eq!(sc.next_event().unwrap(), Event::Paused {});
2837            assert_eq!(
2838                sc.get_trailing_data(),
2839                (b"GET / HTTP/1.0\r\n\r\n".to_vec(), false)
2840            );
2841            sc.send(deny.clone()).unwrap();
2842            assert_eq!(sc.next_event().unwrap(), Event::Paused {});
2843            sc.send(EndOfMessage::default().into()).unwrap();
2844            sc.start_next_cycle().unwrap();
2845            assert_eq!(
2846                get_all_events(sc).unwrap(),
2847                vec![
2848                    Event::Request(Request {
2849                        method: b"GET".to_vec(),
2850                        target: b"/".to_vec(),
2851                        headers: vec![].into(),
2852                        http_version: b"1.0".to_vec(),
2853                    }),
2854                    Event::EndOfMessage(EndOfMessage::default()),
2855                ],
2856            );
2857
2858            // When we're DONE, have no trailing data, and the connection gets
2859            // closed, we report ConnectionClosed(). When we're in might-switch or
2860            // switched, we don't.
2861            let mut p = setup();
2862            {
2863                let sc = (p.conn).get_mut(&Role::Server).unwrap();
2864                sc.receive_data(b"").unwrap();
2865                assert_eq!(sc.next_event().unwrap(), Event::Paused {});
2866                assert_eq!(sc.get_trailing_data(), (b"".to_vec(), true));
2867            }
2868            p.send(Role::Server, vec![accept.clone()], None).unwrap();
2869            assert_eq!(
2870                (p.conn)
2871                    .get_mut(&Role::Server)
2872                    .unwrap()
2873                    .next_event()
2874                    .unwrap(),
2875                Event::Paused {}
2876            );
2877
2878            let mut p = setup();
2879            let sc = p.conn.get_mut(&Role::Server).unwrap();
2880            sc.receive_data(b"").unwrap();
2881            assert_eq!(sc.next_event().unwrap(), Event::Paused {});
2882            sc.send(deny).unwrap();
2883            assert_eq!(
2884                sc.next_event().unwrap(),
2885                Event::ConnectionClosed(ConnectionClosed::default())
2886            );
2887
2888            // You can't send after switching protocols, or while waiting for a
2889            // protocol switch
2890            let mut p = setup();
2891            let cc = p.conn.get_mut(&Role::Client).unwrap();
2892            assert!(match cc.send(
2893                Request::new(
2894                    b"GET".to_vec(),
2895                    vec![(b"Host".to_vec(), b"a".to_vec())].into(),
2896                    b"/".to_vec(),
2897                    b"1.1".to_vec(),
2898                )
2899                .unwrap()
2900                .into(),
2901            ) {
2902                Err(ProtocolError::LocalProtocolError(_)) => true,
2903                _ => false,
2904            });
2905            let mut p = setup();
2906            p.send(Role::Server, vec![accept], None).unwrap();
2907            let cc = p.conn.get_mut(&Role::Client).unwrap();
2908            assert!(match cc.send(
2909                Data {
2910                    data: b"123".to_vec(),
2911                    chunk_start: false,
2912                    chunk_end: false,
2913                }
2914                .into(),
2915            ) {
2916                Err(ProtocolError::LocalProtocolError(_)) => true,
2917                _ => false,
2918            });
2919        }
2920    }
2921
2922    // def test_close_simple() -> None:
2923    //     # Just immediately closing a new connection without anything having
2924    //     # happened yet.
2925    //     for who_shot_first, who_shot_second in [(CLIENT, SERVER), (SERVER, CLIENT)]:
2926
2927    //         def setup() -> ConnectionPair:
2928    //             p = ConnectionPair()
2929    //             p.send(who_shot_first, ConnectionClosed())
2930    //             for conn in p.conns:
2931    //                 assert conn.states == {
2932    //                     who_shot_first: CLOSED,
2933    //                     who_shot_second: MUST_CLOSE,
2934    //                 }
2935    //             return p
2936
2937    //         # You can keep putting b"" into a closed connection, and you keep
2938    //         # getting ConnectionClosed() out:
2939    //         p = setup()
2940    //         assert p.conn[who_shot_second].next_event() == ConnectionClosed()
2941    //         assert p.conn[who_shot_second].next_event() == ConnectionClosed()
2942    //         p.conn[who_shot_second].receive_data(b"")
2943    //         assert p.conn[who_shot_second].next_event() == ConnectionClosed()
2944    //         # Second party can close...
2945    //         p = setup()
2946    //         p.send(who_shot_second, ConnectionClosed())
2947    //         for conn in p.conns:
2948    //             assert conn.our_state is CLOSED
2949    //             assert conn.their_state is CLOSED
2950    //         # But trying to receive new data on a closed connection is a
2951    //         # RuntimeError (not ProtocolError, because the problem here isn't
2952    //         # violation of HTTP, it's violation of physics)
2953    //         p = setup()
2954    //         with pytest.raises(RuntimeError):
2955    //             p.conn[who_shot_second].receive_data(b"123")
2956    //         # And receiving new data on a MUST_CLOSE connection is a ProtocolError
2957    //         p = setup()
2958    //         p.conn[who_shot_first].receive_data(b"GET")
2959    //         with pytest.raises(RemoteProtocolError):
2960    //             p.conn[who_shot_first].next_event()
2961
2962    #[test]
2963    fn test_close_simple() {
2964        // Just immediately closing a new connection without anything having
2965        // happened yet.
2966        for (who_shot_first, who_shot_second) in
2967            vec![(Role::Client, Role::Server), (Role::Server, Role::Client)]
2968        {
2969            let setup = || {
2970                let mut p = ConnectionPair::new();
2971                p.send(
2972                    who_shot_first,
2973                    vec![ConnectionClosed::default().into()],
2974                    None,
2975                )
2976                .unwrap();
2977                for (_, connection) in &mut p.conn {
2978                    assert_eq!(
2979                        connection.get_states(),
2980                        HashMap::from([
2981                            (who_shot_first, State::Closed),
2982                            (who_shot_second, State::MustClose)
2983                        ])
2984                    );
2985                }
2986                return p;
2987            };
2988
2989            // You can keep putting b"" into a closed connection, and you keep
2990            // getting ConnectionClosed() out:
2991            let mut p = setup();
2992            assert_eq!(
2993                p.conn
2994                    .get_mut(&who_shot_second)
2995                    .unwrap()
2996                    .next_event()
2997                    .unwrap(),
2998                Event::ConnectionClosed(ConnectionClosed::default())
2999            );
3000            assert_eq!(
3001                p.conn
3002                    .get_mut(&who_shot_second)
3003                    .unwrap()
3004                    .next_event()
3005                    .unwrap(),
3006                Event::ConnectionClosed(ConnectionClosed::default())
3007            );
3008            p.conn
3009                .get_mut(&who_shot_second)
3010                .unwrap()
3011                .receive_data(b"")
3012                .unwrap();
3013            assert_eq!(
3014                p.conn
3015                    .get_mut(&who_shot_second)
3016                    .unwrap()
3017                    .next_event()
3018                    .unwrap(),
3019                Event::ConnectionClosed(ConnectionClosed::default())
3020            );
3021            // Second party can close...
3022            let mut p = setup();
3023            p.send(
3024                who_shot_second,
3025                vec![ConnectionClosed::default().into()],
3026                None,
3027            )
3028            .unwrap();
3029            for (_, connection) in &mut p.conn {
3030                assert_eq!(connection.get_our_state(), State::Closed);
3031                assert_eq!(connection.get_their_state(), State::Closed);
3032            }
3033            // But trying to receive new data on a closed connection is a
3034            // RuntimeError (not ProtocolError, because the problem here isn't
3035            // violation of HTTP, it's violation of physics)
3036            let mut p = setup();
3037            assert!(match p
3038                .conn
3039                .get_mut(&who_shot_second)
3040                .unwrap()
3041                .receive_data(b"123")
3042            {
3043                Err(_) => true,
3044                _ => false,
3045            });
3046            // And receiving new data on a MUST_CLOSE connection is a ProtocolError
3047            let mut p = setup();
3048            p.conn
3049                .get_mut(&who_shot_first)
3050                .unwrap()
3051                .receive_data(b"GET")
3052                .unwrap();
3053            assert!(match p
3054                .conn
3055                .get_mut(&who_shot_first)
3056                .unwrap()
3057                .next_event()
3058                .unwrap_err()
3059            {
3060                ProtocolError::RemoteProtocolError(_) => true,
3061                _ => false,
3062            });
3063        }
3064    }
3065
3066    // def test_close_different_states() -> None:
3067    //     req = [
3068    //         Request(method="GET", target="/foo", headers=[("Host", "a")]),
3069    //         EndOfMessage(),
3070    //     ]
3071    //     resp = [
3072    //         Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
3073    //         EndOfMessage(),
3074    //     ]
3075
3076    //     # Client before request
3077    //     p = ConnectionPair()
3078    //     p.send(CLIENT, ConnectionClosed())
3079    //     for conn in p.conns:
3080    //         assert conn.states == {CLIENT: CLOSED, SERVER: MUST_CLOSE}
3081
3082    //     # Client after request
3083    //     p = ConnectionPair()
3084    //     p.send(CLIENT, req)
3085    //     p.send(CLIENT, ConnectionClosed())
3086    //     for conn in p.conns:
3087    //         assert conn.states == {CLIENT: CLOSED, SERVER: SEND_RESPONSE}
3088
3089    //     # Server after request -> not allowed
3090    //     p = ConnectionPair()
3091    //     p.send(CLIENT, req)
3092    //     with pytest.raises(LocalProtocolError):
3093    //         p.conn[SERVER].send(ConnectionClosed())
3094    //     p.conn[CLIENT].receive_data(b"")
3095    //     with pytest.raises(RemoteProtocolError):
3096    //         p.conn[CLIENT].next_event()
3097
3098    //     # Server after response
3099    //     p = ConnectionPair()
3100    //     p.send(CLIENT, req)
3101    //     p.send(SERVER, resp)
3102    //     p.send(SERVER, ConnectionClosed())
3103    //     for conn in p.conns:
3104    //         assert conn.states == {CLIENT: MUST_CLOSE, SERVER: CLOSED}
3105
3106    //     # Both after closing (ConnectionClosed() is idempotent)
3107    //     p = ConnectionPair()
3108    //     p.send(CLIENT, req)
3109    //     p.send(SERVER, resp)
3110    //     p.send(CLIENT, ConnectionClosed())
3111    //     p.send(SERVER, ConnectionClosed())
3112    //     p.send(CLIENT, ConnectionClosed())
3113    //     p.send(SERVER, ConnectionClosed())
3114
3115    //     # In the middle of sending -> not allowed
3116    //     p = ConnectionPair()
3117    //     p.send(
3118    //         CLIENT,
3119    //         Request(
3120    //             method="GET", target="/", headers=[("Host", "a"), ("Content-Length", "10")]
3121    //         ),
3122    //     )
3123    //     with pytest.raises(LocalProtocolError):
3124    //         p.conn[CLIENT].send(ConnectionClosed())
3125    //     p.conn[SERVER].receive_data(b"")
3126    //     with pytest.raises(RemoteProtocolError):
3127    //         p.conn[SERVER].next_event()
3128
3129    #[test]
3130    fn test_close_different_states() {
3131        let req: Vec<Event> = vec![
3132            Request::new(
3133                b"GET".to_vec(),
3134                vec![(b"Host".to_vec(), b"a".to_vec())].into(),
3135                b"/foo".to_vec(),
3136                b"1.1".to_vec(),
3137            )
3138            .unwrap()
3139            .into(),
3140            EndOfMessage::default().into(),
3141        ];
3142        let resp: Vec<Event> = vec![
3143            Response {
3144                status_code: 200,
3145                headers: vec![(b"transfer-encoding".to_vec(), b"chunked".to_vec())].into(),
3146                http_version: b"1.1".to_vec(),
3147                reason: b"".to_vec(),
3148            }
3149            .into(),
3150            EndOfMessage::default().into(),
3151        ];
3152
3153        // Client before request
3154        let mut p = ConnectionPair::new();
3155        p.send(Role::Client, vec![ConnectionClosed::default().into()], None)
3156            .unwrap();
3157        for (_, connection) in &mut p.conn {
3158            assert_eq!(
3159                connection.get_states(),
3160                HashMap::from([
3161                    (Role::Client, State::Closed),
3162                    (Role::Server, State::MustClose)
3163                ])
3164            );
3165        }
3166
3167        // Client after request
3168        let mut p = ConnectionPair::new();
3169        p.send(Role::Client, req.clone(), None).unwrap();
3170        p.send(Role::Client, vec![ConnectionClosed::default().into()], None)
3171            .unwrap();
3172        for (_, connection) in &mut p.conn {
3173            assert_eq!(
3174                connection.get_states(),
3175                HashMap::from([
3176                    (Role::Client, State::Closed),
3177                    (Role::Server, State::SendResponse)
3178                ])
3179            );
3180        }
3181
3182        // Server after request -> not allowed
3183        let mut p = ConnectionPair::new();
3184        p.send(Role::Client, req.clone(), None).unwrap();
3185        assert!(match p
3186            .conn
3187            .get_mut(&Role::Server)
3188            .unwrap()
3189            .send(ConnectionClosed::default().into())
3190        {
3191            Err(ProtocolError::LocalProtocolError(_)) => true,
3192            _ => false,
3193        });
3194        p.conn
3195            .get_mut(&Role::Client)
3196            .unwrap()
3197            .receive_data(b"")
3198            .unwrap();
3199        assert!(match p
3200            .conn
3201            .get_mut(&Role::Client)
3202            .unwrap()
3203            .next_event()
3204            .unwrap_err()
3205        {
3206            ProtocolError::RemoteProtocolError(_) => true,
3207            ProtocolError::LocalProtocolError(m) => panic!("{:?}", m),
3208        });
3209
3210        // Server after response
3211        let mut p = ConnectionPair::new();
3212        p.send(Role::Client, req.clone(), None).unwrap();
3213        p.send(Role::Server, resp.clone(), None).unwrap();
3214        p.send(Role::Server, vec![ConnectionClosed::default().into()], None)
3215            .unwrap();
3216        for (_, connection) in &mut p.conn {
3217            assert_eq!(
3218                connection.get_states(),
3219                HashMap::from([
3220                    (Role::Client, State::MustClose),
3221                    (Role::Server, State::Closed)
3222                ])
3223            );
3224        }
3225
3226        // Both after closing (ConnectionClosed() is idempotent)
3227        let mut p = ConnectionPair::new();
3228        p.send(Role::Client, req.clone(), None).unwrap();
3229        p.send(Role::Server, resp.clone(), None).unwrap();
3230        p.send(Role::Client, vec![ConnectionClosed::default().into()], None)
3231            .unwrap();
3232        p.send(Role::Server, vec![ConnectionClosed::default().into()], None)
3233            .unwrap();
3234        p.send(Role::Client, vec![ConnectionClosed::default().into()], None)
3235            .unwrap();
3236        p.send(Role::Server, vec![ConnectionClosed::default().into()], None)
3237            .unwrap();
3238
3239        // In the middle of sending -> not allowed
3240        let mut p = ConnectionPair::new();
3241        p.send(
3242            Role::Client,
3243            vec![Request::new(
3244                b"GET".to_vec(),
3245                vec![
3246                    (b"Host".to_vec(), b"a".to_vec()),
3247                    (b"Content-Length".to_vec(), b"10".to_vec()),
3248                ]
3249                .into(),
3250                b"/".to_vec(),
3251                b"1.1".to_vec(),
3252            )
3253            .unwrap()
3254            .into()],
3255            None,
3256        )
3257        .unwrap();
3258        assert!(match p
3259            .conn
3260            .get_mut(&Role::Client)
3261            .unwrap()
3262            .send(ConnectionClosed::default().into())
3263        {
3264            Err(ProtocolError::LocalProtocolError(_)) => true,
3265            _ => false,
3266        });
3267        p.conn
3268            .get_mut(&Role::Server)
3269            .unwrap()
3270            .receive_data(b"")
3271            .unwrap();
3272        assert!(match p
3273            .conn
3274            .get_mut(&Role::Server)
3275            .unwrap()
3276            .next_event()
3277            .unwrap_err()
3278        {
3279            ProtocolError::RemoteProtocolError(_) => true,
3280            _ => false,
3281        });
3282    }
3283
3284    // # Receive several requests and then client shuts down their side of the
3285    // # connection; we can respond to each
3286    // def test_pipelined_close() -> None:
3287    //     c = Connection(SERVER)
3288    //     # 2 requests then a close
3289    //     c.receive_data(
3290    //         b"GET /1 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n"
3291    //         b"12345"
3292    //         b"GET /2 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n"
3293    //         b"67890"
3294    //     )
3295    //     c.receive_data(b"")
3296    //     assert get_all_events(c) == [
3297    //         Request(
3298    //             method="GET",
3299    //             target="/1",
3300    //             headers=[("host", "a.com"), ("content-length", "5")],
3301    //         ),
3302    //         Data(data=b"12345"),
3303    //         EndOfMessage(),
3304    //     ]
3305    //     assert c.states[CLIENT] is DONE
3306    //     c.send(Response(status_code=200, headers=[]))  # type: ignore[arg-type]
3307    //     c.send(EndOfMessage())
3308    //     assert c.states[SERVER] is DONE
3309    //     c.start_next_cycle()
3310    //     assert get_all_events(c) == [
3311    //         Request(
3312    //             method="GET",
3313    //             target="/2",
3314    //             headers=[("host", "a.com"), ("content-length", "5")],
3315    //         ),
3316    //         Data(data=b"67890"),
3317    //         EndOfMessage(),
3318    //         ConnectionClosed(),
3319    //     ]
3320    //     assert c.states == {CLIENT: CLOSED, SERVER: SEND_RESPONSE}
3321    //     c.send(Response(status_code=200, headers=[]))  # type: ignore[arg-type]
3322    //     c.send(EndOfMessage())
3323    //     assert c.states == {CLIENT: CLOSED, SERVER: MUST_CLOSE}
3324    //     c.send(ConnectionClosed())
3325    //     assert c.states == {CLIENT: CLOSED, SERVER: CLOSED}
3326
3327    #[test]
3328    fn test_pipelined_close() {
3329        let mut c = Connection::new(Role::Server, None);
3330        // 2 requests then a close
3331        c.receive_data(
3332            &vec![
3333                b"GET /1 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n".to_vec(),
3334                b"12345".to_vec(),
3335                b"GET /2 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n".to_vec(),
3336                b"67890".to_vec(),
3337            ]
3338            .into_iter()
3339            .flatten()
3340            .collect::<Vec<u8>>(),
3341        )
3342        .unwrap();
3343        c.receive_data(b"").unwrap();
3344        assert_eq!(
3345            get_all_events(&mut c).unwrap(),
3346            vec![
3347                Event::Request(Request {
3348                    method: b"GET".to_vec(),
3349                    target: b"/1".to_vec(),
3350                    headers: vec![
3351                        (b"Host".to_vec(), b"a.com".to_vec()),
3352                        (b"Content-Length".to_vec(), b"5".to_vec())
3353                    ]
3354                    .into(),
3355                    http_version: b"1.1".to_vec(),
3356                }),
3357                Event::Data(Data {
3358                    data: b"12345".to_vec(),
3359                    chunk_start: false,
3360                    chunk_end: false,
3361                }),
3362                Event::EndOfMessage(EndOfMessage::default()),
3363            ],
3364        );
3365        assert_eq!(c.get_their_state(), State::Done);
3366
3367        c.send(
3368            Response {
3369                status_code: 200,
3370                headers: vec![].into(),
3371                http_version: b"1.1".to_vec(),
3372                reason: b"".to_vec(),
3373            }
3374            .into(),
3375        )
3376        .unwrap();
3377        c.send(EndOfMessage::default().into()).unwrap();
3378        assert_eq!(c.get_our_state(), State::Done);
3379
3380        c.start_next_cycle().unwrap();
3381        assert_eq!(
3382            get_all_events(&mut c).unwrap(),
3383            vec![
3384                Event::Request(Request {
3385                    method: b"GET".to_vec(),
3386                    target: b"/2".to_vec(),
3387                    headers: vec![
3388                        (b"Host".to_vec(), b"a.com".to_vec()),
3389                        (b"Content-Length".to_vec(), b"5".to_vec())
3390                    ]
3391                    .into(),
3392                    http_version: b"1.1".to_vec(),
3393                }),
3394                Event::Data(Data {
3395                    data: b"67890".to_vec(),
3396                    chunk_start: false,
3397                    chunk_end: false,
3398                }),
3399                Event::EndOfMessage(EndOfMessage::default()),
3400                Event::ConnectionClosed(ConnectionClosed::default()),
3401            ],
3402        );
3403        assert_eq!(c.get_their_state(), State::Closed);
3404        assert_eq!(c.get_our_state(), State::SendResponse);
3405        c.send(
3406            Response {
3407                status_code: 200,
3408                headers: vec![].into(),
3409                http_version: b"1.1".to_vec(),
3410                reason: b"".to_vec(),
3411            }
3412            .into(),
3413        )
3414        .unwrap();
3415        c.send(EndOfMessage::default().into()).unwrap();
3416        assert_eq!(c.get_their_state(), State::Closed);
3417        assert_eq!(c.get_our_state(), State::MustClose);
3418        c.send(ConnectionClosed::default().into()).unwrap();
3419        assert_eq!(c.get_their_state(), State::Closed);
3420        assert_eq!(c.get_our_state(), State::Closed);
3421    }
3422
3423    // def test_errors() -> None:
3424    //     # After a receive error, you can't receive
3425    //     for role in [CLIENT, SERVER]:
3426    //         c = Connection(our_role=role)
3427    //         c.receive_data(b"gibberish\r\n\r\n")
3428    //         with pytest.raises(RemoteProtocolError):
3429    //             c.next_event()
3430    //         # Now any attempt to receive continues to raise
3431    //         assert c.their_state is ERROR
3432    //         assert c.our_state is not ERROR
3433    //         print(c._cstate.states)
3434    //         with pytest.raises(RemoteProtocolError):
3435    //             c.next_event()
3436    //         # But we can still yell at the client for sending us gibberish
3437    //         if role is SERVER:
3438    //             assert (
3439    //                 c.send(Response(status_code=400, headers=[]))  # type: ignore[arg-type]
3440    //                 == b"HTTP/1.1 400 \r\nConnection: close\r\n\r\n"
3441    //             )
3442
3443    //     # After an error sending, you can no longer send
3444    //     # (This is especially important for things like content-length errors,
3445    //     # where there's complex internal state being modified)
3446    //     def conn(role: Type[Sentinel]) -> Connection:
3447    //         c = Connection(our_role=role)
3448    //         if role is SERVER:
3449    //             # Put it into the state where it *could* send a response...
3450    //             receive_and_get(c, b"GET / HTTP/1.0\r\n\r\n")
3451    //             assert c.our_state is SEND_RESPONSE
3452    //         return c
3453
3454    //     for role in [CLIENT, SERVER]:
3455    //         if role is CLIENT:
3456    //             # This HTTP/1.0 request won't be detected as bad until after we go
3457    //             # through the state machine and hit the writing code
3458    //             good = Request(method="GET", target="/", headers=[("Host", "example.com")])
3459    //             bad = Request(
3460    //                 method="GET",
3461    //                 target="/",
3462    //                 headers=[("Host", "example.com")],
3463    //                 http_version="1.0",
3464    //             )
3465    //         elif role is SERVER:
3466    //             good = Response(status_code=200, headers=[])  # type: ignore[arg-type,assignment]
3467    //             bad = Response(status_code=200, headers=[], http_version="1.0")  # type: ignore[arg-type,assignment]
3468    //         # Make sure 'good' actually is good
3469    //         c = conn(role)
3470    //         c.send(good)
3471    //         assert c.our_state is not ERROR
3472    //         # Do that again, but this time sending 'bad' first
3473    //         c = conn(role)
3474    //         with pytest.raises(LocalProtocolError):
3475    //             c.send(bad)
3476    //         assert c.our_state is ERROR
3477    //         assert c.their_state is not ERROR
3478    //         # Now 'good' is not so good
3479    //         with pytest.raises(LocalProtocolError):
3480    //             c.send(good)
3481
3482    //         # And check send_failed() too
3483    //         c = conn(role)
3484    //         c.send_failed()
3485    //         assert c.our_state is ERROR
3486    //         assert c.their_state is not ERROR
3487    //         # This is idempotent
3488    //         c.send_failed()
3489    //         assert c.our_state is ERROR
3490    //         assert c.their_state is not ERROR
3491
3492    #[test]
3493    fn test_errors() {
3494        // After a receive error, you can't receive
3495        for role in vec![Role::Client, Role::Server] {
3496            let mut c = Connection::new(role, None);
3497            c.receive_data(b"gibberish\r\n\r\n").unwrap();
3498            assert!(match c.next_event().unwrap_err() {
3499                ProtocolError::RemoteProtocolError(_) => true,
3500                _ => false,
3501            });
3502            // Now any attempt to receive continues to raise
3503            assert_eq!(c.get_their_state(), State::Error);
3504            assert_ne!(c.get_our_state(), State::Error);
3505            assert!(match c.next_event().unwrap_err() {
3506                ProtocolError::RemoteProtocolError(_) => true,
3507                _ => false,
3508            });
3509            // But we can still yell at the client for sending us gibberish
3510            if role == Role::Server {
3511                assert_eq!(
3512                    c.send(
3513                        Response {
3514                            status_code: 400,
3515                            headers: vec![].into(),
3516                            http_version: b"1.1".to_vec(),
3517                            reason: b"".to_vec(),
3518                        }
3519                        .into()
3520                    )
3521                    .unwrap()
3522                    .unwrap(),
3523                    b"HTTP/1.1 400 \r\nconnection: close\r\n\r\n".to_vec()
3524                );
3525            }
3526        }
3527
3528        // After an error sending, you can no longer send
3529        // (This is especially important for things like content-length errors,
3530        // where there's complex internal state being modified)
3531        let conn = |role: Role| -> Connection {
3532            let mut c = Connection::new(role, None);
3533            if role == Role::Server {
3534                // Put it into the state where it *could* send a response...
3535                receive_and_get(
3536                    &mut c,
3537                    &b"GET / HTTP/1.0\r\n\r\n"
3538                        .to_vec()
3539                        .into_iter()
3540                        .collect::<Vec<u8>>(),
3541                )
3542                .unwrap();
3543                assert_eq!(c.get_our_state(), State::SendResponse);
3544            }
3545            return c;
3546        };
3547
3548        for role in vec![Role::Client, Role::Server] {
3549            let (good, bad): (Event, Event) = if role == Role::Client {
3550                // This HTTP/1.0 request won't be detected as bad until after we go
3551                // through the state machine and hit the writing code
3552                (
3553                    Request::new(
3554                        b"GET".to_vec(),
3555                        vec![(b"Host".to_vec(), b"example.com".to_vec())].into(),
3556                        b"/".to_vec(),
3557                        b"1.1".to_vec(),
3558                    )
3559                    .unwrap()
3560                    .into(),
3561                    Request::new(
3562                        b"GET".to_vec(),
3563                        vec![(b"Host".to_vec(), b"example.com".to_vec())].into(),
3564                        b"/".to_vec(),
3565                        b"1.0".to_vec(),
3566                    )
3567                    .unwrap()
3568                    .into(),
3569                )
3570            } else {
3571                (
3572                    Response {
3573                        status_code: 200,
3574                        headers: vec![].into(),
3575                        http_version: b"1.1".to_vec(),
3576                        reason: b"".to_vec(),
3577                    }
3578                    .into(),
3579                    Response {
3580                        status_code: 200,
3581                        headers: vec![].into(),
3582                        http_version: b"1.0".to_vec(),
3583                        reason: b"".to_vec(),
3584                    }
3585                    .into(),
3586                )
3587            };
3588            // Make sure 'good' actually is good
3589            let mut c = conn(role);
3590            c.send(good.clone()).unwrap();
3591            assert_ne!(c.get_our_state(), State::Error);
3592            // Do that again, but this time sending 'bad' first
3593            let mut c = conn(role);
3594            assert!(match c.send(bad.clone()) {
3595                Err(ProtocolError::LocalProtocolError(_)) => true,
3596                _ => false,
3597            });
3598            assert_eq!(c.get_our_state(), State::Error);
3599            assert_ne!(c.get_their_state(), State::Error);
3600            // Now 'good' is not so good
3601            assert!(match c.send(good.clone()) {
3602                Err(ProtocolError::LocalProtocolError(_)) => true,
3603                _ => false,
3604            });
3605
3606            // And check send_failed() too
3607            let mut c = conn(role);
3608            c.send_failed();
3609            assert_eq!(c.get_our_state(), State::Error);
3610            assert_ne!(c.get_their_state(), State::Error);
3611            // This is idempotent
3612            c.send_failed();
3613            assert_eq!(c.get_our_state(), State::Error);
3614            assert_ne!(c.get_their_state(), State::Error);
3615        }
3616    }
3617
3618    #[test]
3619    fn test_send_wrong_event_type_returns_error_without_panic() {
3620        let mut c = Connection::new(Role::Client, None);
3621        assert!(matches!(
3622            c.send(Event::Data(Data {
3623                data: b"unexpected".to_vec(),
3624                ..Default::default()
3625            })),
3626            Err(ProtocolError::LocalProtocolError(_))
3627        ));
3628        assert_eq!(c.get_our_state(), State::Error);
3629    }
3630
3631    #[test]
3632    fn test_send_serialization_error_marks_our_state_error() {
3633        let mut c = Connection::new(Role::Client, None);
3634        let bad = Request::new(
3635            b"GET",
3636            Headers::new([("Host", "example.com")]).unwrap(),
3637            b"/",
3638            b"1.0",
3639        )
3640        .unwrap();
3641
3642        assert!(matches!(
3643            c.send(bad.into()),
3644            Err(ProtocolError::LocalProtocolError(_))
3645        ));
3646        assert_eq!(c.get_our_state(), State::Error);
3647        assert_ne!(c.get_their_state(), State::Error);
3648    }
3649
3650    #[test]
3651    fn test_receive_data_after_eof_uses_protocol_error() {
3652        let mut c = Connection::new(Role::Server, None);
3653        c.receive_data(b"").unwrap();
3654        assert!(matches!(
3655            c.receive_data(b"extra"),
3656            Err(ProtocolError::LocalProtocolError(_))
3657        ));
3658    }
3659
3660    #[test]
3661    fn test_non_utf8_content_length_returns_protocol_error() {
3662        let mut c = Connection::new(Role::Server, None);
3663        c.receive_data(b"GET / HTTP/1.1\r\nHost: example.com\r\nContent-Length: \xff\r\n\r\n")
3664            .unwrap();
3665        assert!(matches!(
3666            c.next_event(),
3667            Err(ProtocolError::RemoteProtocolError(_))
3668        ));
3669    }
3670
3671    // def test_idle_receive_nothing() -> None:
3672    //     # At one point this incorrectly raised an error
3673    //     for role in [CLIENT, SERVER]:
3674    //         c = Connection(role)
3675    //         assert c.next_event() is NEED_DATA
3676
3677    #[test]
3678    fn test_idle_receive_nothing() {
3679        // At one point this incorrectly raised an error
3680        for role in vec![Role::Client, Role::Server] {
3681            let mut c = Connection::new(role, None);
3682            assert_eq!(c.next_event().unwrap(), Event::NeedData {});
3683        }
3684    }
3685
3686    // def test_connection_drop() -> None:
3687    //     c = Connection(SERVER)
3688    //     c.receive_data(b"GET /")
3689    //     assert c.next_event() is NEED_DATA
3690    //     c.receive_data(b"")
3691    //     with pytest.raises(RemoteProtocolError):
3692    //         c.next_event()
3693
3694    #[test]
3695    fn test_connection_drop() {
3696        let mut c = Connection::new(Role::Server, None);
3697        c.receive_data(b"GET /").unwrap();
3698        assert_eq!(c.next_event().unwrap(), Event::NeedData {});
3699        c.receive_data(b"").unwrap();
3700        assert!(match c.next_event().unwrap_err() {
3701            ProtocolError::RemoteProtocolError(_) => true,
3702            _ => false,
3703        });
3704    }
3705
3706    // def test_408_request_timeout() -> None:
3707    //     # Should be able to send this spontaneously as a server without seeing
3708    //     # anything from client
3709    //     p = ConnectionPair()
3710    //     p.send(SERVER, Response(status_code=408, headers=[(b"connection", b"close")]))
3711
3712    #[test]
3713    fn test_408_request_timeout() {
3714        // Should be able to send this spontaneously as a server without seeing
3715        // anything from client
3716        let mut p = ConnectionPair::new();
3717        p.send(
3718            Role::Server,
3719            vec![Response {
3720                status_code: 408,
3721                headers: vec![(b"connection".to_vec(), b"close".to_vec())].into(),
3722                http_version: b"1.1".to_vec(),
3723                reason: b"".to_vec(),
3724            }
3725            .into()],
3726            None,
3727        )
3728        .unwrap();
3729    }
3730
3731    // # This used to raise IndexError
3732    // def test_empty_request() -> None:
3733    //     c = Connection(SERVER)
3734    //     c.receive_data(b"\r\n")
3735    //     with pytest.raises(RemoteProtocolError):
3736    //         c.next_event()
3737
3738    #[test]
3739    fn test_empty_request() {
3740        let mut c = Connection::new(Role::Server, None);
3741        c.receive_data(b"\r\n").unwrap();
3742        assert!(match c.next_event().unwrap_err() {
3743            ProtocolError::RemoteProtocolError(_) => true,
3744            _ => false,
3745        });
3746    }
3747
3748    // # This used to raise IndexError
3749    // def test_empty_response() -> None:
3750    //     c = Connection(CLIENT)
3751    //     c.send(Request(method="GET", target="/", headers=[("Host", "a")]))
3752    //     c.receive_data(b"\r\n")
3753    //     with pytest.raises(RemoteProtocolError):
3754    //         c.next_event()
3755
3756    #[test]
3757    fn test_empty_response() {
3758        let mut c = Connection::new(Role::Client, None);
3759        c.send(
3760            Request::new(
3761                b"GET".to_vec(),
3762                vec![(b"Host".to_vec(), b"a".to_vec())].into(),
3763                b"/".to_vec(),
3764                b"1.1".to_vec(),
3765            )
3766            .unwrap()
3767            .into(),
3768        )
3769        .unwrap();
3770        c.receive_data(b"\r\n").unwrap();
3771        assert!(match c.next_event().unwrap_err() {
3772            ProtocolError::RemoteProtocolError(_) => true,
3773            _ => false,
3774        });
3775    }
3776
3777    // @pytest.mark.parametrize(
3778    //     "data",
3779    //     [
3780    //         b"\x00",
3781    //         b"\x20",
3782    //         b"\x16\x03\x01\x00\xa5",  # Typical start of a TLS Client Hello
3783    //     ],
3784    // )
3785    // def test_early_detection_of_invalid_request(data: bytes) -> None:
3786    //     c = Connection(SERVER)
3787    //     # Early detection should occur before even receiving a `\r\n`
3788    //     c.receive_data(data)
3789    //     with pytest.raises(RemoteProtocolError):
3790    //         c.next_event()
3791
3792    #[test]
3793    fn test_early_detection_of_invalid_request() {
3794        let data = vec![
3795            b"\x00".to_vec(),
3796            b"\x20".to_vec(),
3797            b"\x16\x03\x01\x00\xa5".to_vec(), // Typical start of a TLS Client Hello
3798        ];
3799        for data in data {
3800            let mut c = Connection::new(Role::Server, None);
3801            // Early detection should occur before even receiving a `\r\n`
3802            c.receive_data(&data).unwrap();
3803            assert!(match c.next_event().unwrap_err() {
3804                ProtocolError::RemoteProtocolError(_) => true,
3805                _ => false,
3806            });
3807        }
3808    }
3809
3810    // @pytest.mark.parametrize(
3811    //     "data",
3812    //     [
3813    //         b"\x00",
3814    //         b"\x20",
3815    //         b"\x16\x03\x03\x00\x31",  # Typical start of a TLS Server Hello
3816    //     ],
3817    // )
3818    // def test_early_detection_of_invalid_response(data: bytes) -> None:
3819    //     c = Connection(CLIENT)
3820    //     # Early detection should occur before even receiving a `\r\n`
3821    //     c.receive_data(data)
3822    //     with pytest.raises(RemoteProtocolError):
3823    //         c.next_event()
3824
3825    #[test]
3826    fn test_early_detection_of_invalid_response() {
3827        let data = vec![
3828            b"\x00".to_vec(),
3829            b"\x20".to_vec(),
3830            b"\x16\x03\x03\x00\x31".to_vec(), // Typical start of a TLS Server Hello
3831        ];
3832        for data in data {
3833            let mut c = Connection::new(Role::Client, None);
3834            // Early detection should occur before even receiving a `\r\n`
3835            c.receive_data(&data).unwrap();
3836            assert!(match c.next_event().unwrap_err() {
3837                ProtocolError::RemoteProtocolError(_) => true,
3838                _ => false,
3839            });
3840        }
3841    }
3842
3843    // # This used to give different headers for HEAD and GET.
3844    // # The correct way to handle HEAD is to put whatever headers we *would* have
3845    // # put if it were a GET -- even though we know that for HEAD, those headers
3846    // # will be ignored.
3847    // def test_HEAD_framing_headers() -> None:
3848    //     def setup(method: bytes, http_version: bytes) -> Connection:
3849    //         c = Connection(SERVER)
3850    //         c.receive_data(
3851    //             method + b" / HTTP/" + http_version + b"\r\n" + b"Host: example.com\r\n\r\n"
3852    //         )
3853    //         assert type(c.next_event()) is Request
3854    //         assert type(c.next_event()) is EndOfMessage
3855    //         return c
3856
3857    //     for method in [b"GET", b"HEAD"]:
3858    //         # No Content-Length, HTTP/1.1 peer, should use chunked
3859    //         c = setup(method, b"1.1")
3860    //         assert (
3861    //             c.send(Response(status_code=200, headers=[])) == b"HTTP/1.1 200 \r\n"  # type: ignore[arg-type]
3862    //             b"Transfer-Encoding: chunked\r\n\r\n"
3863    //         )
3864
3865    //         # No Content-Length, HTTP/1.0 peer, frame with connection: close
3866    //         c = setup(method, b"1.0")
3867    //         assert (
3868    //             c.send(Response(status_code=200, headers=[])) == b"HTTP/1.1 200 \r\n"  # type: ignore[arg-type]
3869    //             b"Connection: close\r\n\r\n"
3870    //         )
3871
3872    //         # Content-Length + Transfer-Encoding, TE wins
3873    //         c = setup(method, b"1.1")
3874    //         assert (
3875    //             c.send(
3876    //                 Response(
3877    //                     status_code=200,
3878    //                     headers=[
3879    //                         ("Content-Length", "100"),
3880    //                         ("Transfer-Encoding", "chunked"),
3881    //                     ],
3882    //                 )
3883    //             )
3884    //             == b"HTTP/1.1 200 \r\n"
3885    //             b"Transfer-Encoding: chunked\r\n\r\n"
3886    //         )
3887
3888    #[test]
3889    fn test_head_framing_headers() {
3890        let setup = |method: &[u8], http_version: &[u8]| -> Connection {
3891            let mut c = Connection::new(Role::Server, None);
3892            c.receive_data(
3893                &vec![
3894                    method.to_vec(),
3895                    b" / HTTP/".to_vec(),
3896                    http_version.to_vec(),
3897                    b"\r\n".to_vec(),
3898                    b"Host: example.com\r\n\r\n".to_vec(),
3899                ]
3900                .into_iter()
3901                .flatten()
3902                .collect::<Vec<u8>>(),
3903            )
3904            .unwrap();
3905            assert!(match c.next_event().unwrap() {
3906                Event::Request(_) => true,
3907                _ => false,
3908            });
3909            assert!(match c.next_event().unwrap() {
3910                Event::EndOfMessage(_) => true,
3911                _ => false,
3912            });
3913            return c;
3914        };
3915
3916        for method in vec![b"GET".to_vec(), b"HEAD".to_vec()] {
3917            // No Content-Length, HTTP/1.1 peer, should use chunked
3918            let mut c = setup(&method, &b"1.1".to_vec());
3919            assert_eq!(
3920                c.send(
3921                    Response {
3922                        status_code: 200,
3923                        headers: vec![].into(),
3924                        http_version: b"1.1".to_vec(),
3925                        reason: b"".to_vec(),
3926                    }
3927                    .into()
3928                )
3929                .unwrap()
3930                .unwrap(),
3931                b"HTTP/1.1 200 \r\ntransfer-encoding: chunked\r\n\r\n".to_vec()
3932            );
3933
3934            // No Content-Length, HTTP/1.0 peer, frame with connection: close
3935            let mut c = setup(&method, &b"1.0".to_vec());
3936            assert_eq!(
3937                c.send(
3938                    Response {
3939                        status_code: 200,
3940                        headers: vec![(b"connection".to_vec(), b"close".to_vec())].into(),
3941                        http_version: b"1.1".to_vec(),
3942                        reason: b"".to_vec(),
3943                    }
3944                    .into()
3945                )
3946                .unwrap()
3947                .unwrap(),
3948                b"HTTP/1.1 200 \r\nconnection: close\r\n\r\n".to_vec()
3949            );
3950
3951            // Content-Length + Transfer-Encoding, TE wins
3952            let mut c = setup(&method, &b"1.1".to_vec());
3953            assert_eq!(
3954                c.send(
3955                    Response {
3956                        status_code: 200,
3957                        headers: vec![
3958                            (b"Content-Length".to_vec(), b"100".to_vec()),
3959                            (b"Transfer-Encoding".to_vec(), b"chunked".to_vec())
3960                        ]
3961                        .into(),
3962                        http_version: b"1.1".to_vec(),
3963                        reason: b"".to_vec(),
3964                    }
3965                    .into()
3966                )
3967                .unwrap()
3968                .unwrap(),
3969                b"HTTP/1.1 200 \r\ntransfer-encoding: chunked\r\n\r\n".to_vec()
3970            );
3971        }
3972    }
3973
3974    // def test_special_exceptions_for_lost_connection_in_message_body() -> None:
3975    //     c = Connection(SERVER)
3976    //     c.receive_data(
3977    //         b"POST / HTTP/1.1\r\n" b"Host: example.com\r\n" b"Content-Length: 100\r\n\r\n"
3978    //     )
3979    //     assert type(c.next_event()) is Request
3980    //     assert c.next_event() is NEED_DATA
3981    //     c.receive_data(b"12345")
3982    //     assert c.next_event() == Data(data=b"12345")
3983    //     c.receive_data(b"")
3984    //     with pytest.raises(RemoteProtocolError) as excinfo:
3985    //         c.next_event()
3986    //     assert "received 5 bytes" in str(excinfo.value)
3987    //     assert "expected 100" in str(excinfo.value)
3988
3989    //     c = Connection(SERVER)
3990    //     c.receive_data(
3991    //         b"POST / HTTP/1.1\r\n"
3992    //         b"Host: example.com\r\n"
3993    //         b"Transfer-Encoding: chunked\r\n\r\n"
3994    //     )
3995    //     assert type(c.next_event()) is Request
3996    //     assert c.next_event() is NEED_DATA
3997    //     c.receive_data(b"8\r\n012345")
3998    //     assert c.next_event().data == b"012345"  # type: ignore
3999    //     c.receive_data(b"")
4000    //     with pytest.raises(RemoteProtocolError) as excinfo:
4001    //         c.next_event()
4002    //     assert "incomplete chunked read" in str(excinfo.value)
4003
4004    #[test]
4005    fn test_special_exceptions_for_lost_connection_in_message_body() {
4006        let mut c = Connection::new(Role::Server, None);
4007        c.receive_data(
4008            &vec![
4009                b"POST / HTTP/1.1\r\n".to_vec(),
4010                b"Host: example.com\r\n".to_vec(),
4011                b"Content-Length: 100\r\n\r\n".to_vec(),
4012            ]
4013            .into_iter()
4014            .flatten()
4015            .collect::<Vec<u8>>(),
4016        )
4017        .unwrap();
4018        assert!(match c.next_event().unwrap() {
4019            Event::Request(_) => true,
4020            _ => false,
4021        });
4022        assert_eq!(c.next_event().unwrap(), Event::NeedData {});
4023        c.receive_data(b"12345").unwrap();
4024        assert_eq!(
4025            c.next_event().unwrap(),
4026            Event::Data(Data {
4027                data: b"12345".to_vec(),
4028                chunk_start: false,
4029                chunk_end: false,
4030            })
4031        );
4032        c.receive_data(b"").unwrap();
4033        assert!(match c.next_event().unwrap_err() {
4034            ProtocolError::RemoteProtocolError(_) => true,
4035            _ => false,
4036        });
4037
4038        let mut c = Connection::new(Role::Server, None);
4039        c.receive_data(
4040            &vec![
4041                b"POST / HTTP/1.1\r\n".to_vec(),
4042                b"Host: example.com\r\n".to_vec(),
4043                b"Transfer-Encoding: chunked\r\n\r\n".to_vec(),
4044            ]
4045            .into_iter()
4046            .flatten()
4047            .collect::<Vec<u8>>(),
4048        )
4049        .unwrap();
4050        assert!(match c.next_event().unwrap() {
4051            Event::Request(_) => true,
4052            _ => false,
4053        });
4054        assert_eq!(c.next_event().unwrap(), Event::NeedData {});
4055        c.receive_data(b"8\r\n012345").unwrap();
4056        assert_eq!(
4057            match c.next_event().unwrap() {
4058                Event::Data(d) => d.data,
4059                _ => panic!(),
4060            },
4061            b"012345".to_vec()
4062        );
4063        c.receive_data(b"").unwrap();
4064        assert!(match c.next_event().unwrap_err() {
4065            ProtocolError::RemoteProtocolError(_) => true,
4066            _ => false,
4067        });
4068    }
4069}