1use http::{StatusCode, Version};
2use tokio::sync::mpsc;
3use tracing::debug;
4
5use super::types::{H2Event, H2EventPayload};
6use crate::{h1::body::BodyWriteMode, Encoder, Response};
7use fluke_h2_parse::StreamId;
8
9pub(crate) enum EncoderState {
10 ExpectResponseHeaders,
11 ExpectResponseBody,
12 ResponseDone,
13}
14
15pub struct H2Encoder {
16 pub(crate) stream_id: StreamId,
17 pub(crate) tx: mpsc::Sender<H2Event>,
18 pub(crate) state: EncoderState,
19}
20
21impl H2Encoder {
22 fn event(&self, payload: H2EventPayload) -> H2Event {
23 H2Event {
24 payload,
25 stream_id: self.stream_id,
26 }
27 }
28
29 async fn send(&self, payload: H2EventPayload) -> eyre::Result<()> {
30 self.tx
31 .send(self.event(payload))
32 .await
33 .map_err(|_| eyre::eyre!("could not send event to h2 connection handler"))?;
34 Ok(())
35 }
36}
37
38impl Encoder for H2Encoder {
39 async fn write_response(&mut self, res: Response) -> eyre::Result<()> {
40 assert!(matches!(self.state, EncoderState::ExpectResponseHeaders));
42
43 self.send(H2EventPayload::Headers(res)).await?;
44 self.state = EncoderState::ExpectResponseBody;
45
46 Ok(())
47 }
48
49 async fn write_body_chunk(
51 &mut self,
52 chunk: fluke_buffet::PieceCore,
53 _mode: BodyWriteMode,
54 ) -> eyre::Result<()> {
55 assert!(matches!(self.state, EncoderState::ExpectResponseBody));
56
57 self.send(H2EventPayload::BodyChunk(chunk)).await?;
58 Ok(())
59 }
60
61 async fn write_body_end(&mut self, _mode: BodyWriteMode) -> eyre::Result<()> {
63 assert!(matches!(self.state, EncoderState::ExpectResponseBody));
64
65 self.send(H2EventPayload::BodyEnd).await?;
66 self.state = EncoderState::ResponseDone;
67
68 Ok(())
69 }
70
71 async fn write_trailers(&mut self, _trailers: Box<crate::Headers>) -> eyre::Result<()> {
73 assert!(matches!(self.state, EncoderState::ResponseDone));
74
75 todo!("write trailers")
76 }
77}
78
79impl Drop for H2Encoder {
80 fn drop(&mut self) {
81 let mut evs = vec![];
82
83 match self.state {
84 EncoderState::ExpectResponseHeaders => {
85 evs.push(self.event(H2EventPayload::Headers(Response {
86 version: Version::HTTP_11,
87 status: StatusCode::INTERNAL_SERVER_ERROR,
88 headers: Default::default(),
89 })));
90 evs.push(self.event(H2EventPayload::BodyEnd));
91 }
92 EncoderState::ExpectResponseBody => {
93 evs.push(self.event(H2EventPayload::BodyEnd));
95 }
96 EncoderState::ResponseDone => {
97 }
99 }
100
101 if !evs.is_empty() {
102 let tx = self.tx.clone();
103 fluke_buffet::spawn(async move {
104 for ev in evs {
105 if tx.send(ev).await.is_err() {
106 debug!("could not send event to h2 connection handler");
107 break;
108 }
109 }
110 });
111 }
112 }
113}