fluke/h2/
encode.rs

1use http::{StatusCode, Version};
2use tokio::sync::mpsc;
3use tracing::debug;
4
5use super::types::{H2Event, H2EventPayload};
6use crate::{h1::body::BodyWriteMode, Encoder, Response};
7use fluke_h2_parse::StreamId;
8
9pub(crate) enum EncoderState {
10    ExpectResponseHeaders,
11    ExpectResponseBody,
12    ResponseDone,
13}
14
15pub struct H2Encoder {
16    pub(crate) stream_id: StreamId,
17    pub(crate) tx: mpsc::Sender<H2Event>,
18    pub(crate) state: EncoderState,
19}
20
21impl H2Encoder {
22    fn event(&self, payload: H2EventPayload) -> H2Event {
23        H2Event {
24            payload,
25            stream_id: self.stream_id,
26        }
27    }
28
29    async fn send(&self, payload: H2EventPayload) -> eyre::Result<()> {
30        self.tx
31            .send(self.event(payload))
32            .await
33            .map_err(|_| eyre::eyre!("could not send event to h2 connection handler"))?;
34        Ok(())
35    }
36}
37
38impl Encoder for H2Encoder {
39    async fn write_response(&mut self, res: Response) -> eyre::Result<()> {
40        // TODO: don't panic here
41        assert!(matches!(self.state, EncoderState::ExpectResponseHeaders));
42
43        self.send(H2EventPayload::Headers(res)).await?;
44        self.state = EncoderState::ExpectResponseBody;
45
46        Ok(())
47    }
48
49    // TODO: BodyWriteMode is not relevant for h2
50    async fn write_body_chunk(
51        &mut self,
52        chunk: fluke_buffet::PieceCore,
53        _mode: BodyWriteMode,
54    ) -> eyre::Result<()> {
55        assert!(matches!(self.state, EncoderState::ExpectResponseBody));
56
57        self.send(H2EventPayload::BodyChunk(chunk)).await?;
58        Ok(())
59    }
60
61    // TODO: BodyWriteMode is not relevant for h2
62    async fn write_body_end(&mut self, _mode: BodyWriteMode) -> eyre::Result<()> {
63        assert!(matches!(self.state, EncoderState::ExpectResponseBody));
64
65        self.send(H2EventPayload::BodyEnd).await?;
66        self.state = EncoderState::ResponseDone;
67
68        Ok(())
69    }
70
71    // TODO: handle trailers
72    async fn write_trailers(&mut self, _trailers: Box<crate::Headers>) -> eyre::Result<()> {
73        assert!(matches!(self.state, EncoderState::ResponseDone));
74
75        todo!("write trailers")
76    }
77}
78
79impl Drop for H2Encoder {
80    fn drop(&mut self) {
81        let mut evs = vec![];
82
83        match self.state {
84            EncoderState::ExpectResponseHeaders => {
85                evs.push(self.event(H2EventPayload::Headers(Response {
86                    version: Version::HTTP_11,
87                    status: StatusCode::INTERNAL_SERVER_ERROR,
88                    headers: Default::default(),
89                })));
90                evs.push(self.event(H2EventPayload::BodyEnd));
91            }
92            EncoderState::ExpectResponseBody => {
93                // TODO: this should probably be RST_STREAM instead
94                evs.push(self.event(H2EventPayload::BodyEnd));
95            }
96            EncoderState::ResponseDone => {
97                // ah, good.
98            }
99        }
100
101        if !evs.is_empty() {
102            let tx = self.tx.clone();
103            fluke_buffet::spawn(async move {
104                for ev in evs {
105                    if tx.send(ev).await.is_err() {
106                        debug!("could not send event to h2 connection handler");
107                        break;
108                    }
109                }
110            });
111        }
112    }
113}