wtx/
http2.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
//! HTTP/2
//!
//! 1. Does not support padded headers when writing.
//! 2. Does not support push promises (Deprecated by major third-parties).
//! 3. Does not support prioritization (Deprecated by the RFC).

#[macro_use]
mod macros;

mod client_stream;
mod common_flags;
mod common_stream;
mod continuation_frame;
mod data_frame;
mod frame_init;
mod frame_reader;
mod go_away_frame;
mod headers_frame;
mod hpack_decoder;
mod hpack_encoder;
mod hpack_header;
mod hpack_headers;
mod hpack_static_headers;
mod http2_buffer;
mod http2_data;
mod http2_error;
mod http2_error_code;
mod http2_params;
mod http2_params_send;
mod http2_status;
mod huffman;
mod huffman_tables;
mod index_map;
mod initial_server_header;
mod misc;
mod ping_frame;
mod process_receipt_frame_ty;
mod reset_stream_frame;
mod send_data_mode;
mod send_msg;
mod server_stream;
mod settings_frame;
mod stream_receiver;
mod stream_state;
#[cfg(all(feature = "_async-tests", test))]
mod tests;
mod u31;
mod uri_buffer;
#[cfg(feature = "web-socket")]
mod web_socket_over_stream;
mod window;
mod window_update_frame;

use crate::{
  http::{Method, Protocol, ReqResBuffer, Request},
  http2::misc::{
    frame_reader_rslt, manage_initial_stream_receiving, process_higher_operation_err, protocol_err,
    sorp_mut, write_array,
  },
  misc::{
    Arc, AtomicWaker, ConnectionState, Either, LeaseMut, Lock, PartitionedFilledBuffer, RefCounter,
    StreamReader, StreamWriter, Usize, NOOP_WAKER,
  },
};
pub use client_stream::ClientStream;
pub use common_stream::CommonStream;
use core::{
  future::{poll_fn, Future},
  mem,
  pin::pin,
  sync::atomic::{AtomicBool, Ordering},
  task::Poll,
};
use hashbrown::HashMap;
pub use http2_buffer::Http2Buffer;
pub use http2_data::Http2Data;
pub use http2_error::Http2Error;
pub use http2_error_code::Http2ErrorCode;
pub use http2_params::Http2Params;
pub use http2_status::{Http2RecvStatus, Http2SendStatus};
pub use send_data_mode::{SendDataMode, SendDataModeBytes};
pub use server_stream::ServerStream;
#[cfg(feature = "web-socket")]
pub use web_socket_over_stream::WebSocketOverStream;
pub use window::{Window, Windows};

pub(crate) const MAX_BODY_LEN: u32 = max_body_len!();
pub(crate) const MAX_HPACK_LEN: u32 = max_hpack_len!();
pub(crate) const MAX_CONCURRENT_STREAMS_NUM: u32 = max_concurrent_streams_num!();
pub(crate) const MAX_HEADERS_LEN: u32 = max_headers_len!();
pub(crate) const MAX_FRAME_LEN: u32 = max_frame_len!();
pub(crate) const MAX_FRAME_LEN_LOWER_BOUND: u32 = max_frame_len_lower_bound!();
pub(crate) const MAX_FRAME_LEN_UPPER_BOUND: u32 = max_frame_len_upper_bound!();
pub(crate) const MAX_RECV_STREAMS_NUM: u32 = max_recv_streams_num!();
pub(crate) const READ_BUFFER_LEN: u32 = read_buffer_len!();

const PREFACE: &[u8; 24] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";

/// [`Http2`] instance using the mutex from tokio.
#[cfg(feature = "tokio")]
pub type Http2Tokio<HB, SW, const IS_CLIENT: bool> =
  Http2<Http2DataTokio<HB, SW, IS_CLIENT>, IS_CLIENT>;
/// [`Http2Data`] instance using the mutex from tokio.
#[cfg(feature = "tokio")]
pub type Http2DataTokio<HB, SW, const IS_CLIENT: bool> =
  Arc<tokio::sync::Mutex<Http2Data<HB, SW, IS_CLIENT>>>;

pub(crate) type Scrp = HashMap<u31::U31, stream_receiver::StreamControlRecvParams>;
pub(crate) type Sorp = HashMap<u31::U31, stream_receiver::StreamOverallRecvParams>;

/// Negotiates initial "handshakes" or connections and also manages the creation of streams.
#[derive(Debug)]
pub struct Http2<HD, const IS_CLIENT: bool> {
  hd: HD,
  is_conn_open: Arc<AtomicBool>,
  ish_id: u32,
}

impl<HB, HD, SW, const IS_CLIENT: bool> Http2<HD, IS_CLIENT>
where
  HB: LeaseMut<Http2Buffer>,
  HD: RefCounter,
  HD::Item: Lock<Resource = Http2Data<HB, SW, IS_CLIENT>>,
  SW: StreamWriter,
{
  /// See [`ConnectionState`].
  #[inline]
  pub fn connection_state(&self) -> ConnectionState {
    ConnectionState::from(self.is_conn_open.load(Ordering::Relaxed))
  }

  send_go_away_method!();

  #[inline]
  pub(crate) async fn _swap_buffers(&mut self, hb: &mut HB) {
    mem::swap(hb.lease_mut(), self.hd.lock().await.parts_mut().hb);
  }

  #[inline]
  async fn manage_initial_params<const HAS_PREFACE: bool>(
    hb: &mut Http2Buffer,
    hp: &Http2Params,
    stream_writer: &mut SW,
  ) -> crate::Result<(Arc<AtomicBool>, u32, PartitionedFilledBuffer, Arc<AtomicWaker>)> {
    hb.is_conn_open.store(true, Ordering::Relaxed);
    let sf = hp.to_settings_frame();
    let sf_buffer = &mut [0; 45];
    let sf_bytes = sf.bytes(sf_buffer);
    if hp.initial_window_len() == initial_window_len!() {
      if HAS_PREFACE {
        write_array([PREFACE, sf_bytes], &hb.is_conn_open, stream_writer).await?;
      } else {
        write_array([sf_bytes], &hb.is_conn_open, stream_writer).await?;
      }
    } else {
      let wuf = window_update_frame::WindowUpdateFrame::new(
        hp.initial_window_len().wrapping_sub(initial_window_len!()).into(),
        u31::U31::ZERO,
      )?;
      if HAS_PREFACE {
        write_array([PREFACE, sf_bytes, &wuf.bytes()], &hb.is_conn_open, stream_writer).await?;
      } else {
        write_array([sf_bytes, &wuf.bytes()], &hb.is_conn_open, stream_writer).await?;
      }
    }
    hb.hpack_dec.set_max_bytes(hp.max_hpack_len().0);
    hb.hpack_dec.reserve(4, 256)?;
    hb.hpack_enc.set_max_dyn_super_bytes(hp.max_hpack_len().1);
    hb.hpack_enc.reserve(4, 256)?;
    hb.pfb._reserve(*Usize::from(hp.read_buffer_len()))?;
    Ok((
      Arc::clone(&hb.is_conn_open),
      hp.max_frame_len(),
      mem::take(&mut hb.pfb),
      Arc::clone(&hb.read_frame_waker),
    ))
  }
}

impl<HB, HD, SW> Http2<HD, false>
where
  HB: LeaseMut<Http2Buffer>,
  HD: RefCounter,
  HD::Item: Lock<Resource = Http2Data<HB, SW, false>>,
  SW: StreamWriter,
{
  /// Accepts an initial connection sending the local parameters to the remote peer.
  #[inline]
  pub async fn accept<SR>(
    mut hb: HB,
    hp: Http2Params,
    (mut stream_reader, mut stream_writer): (SR, SW),
  ) -> crate::Result<(impl Future<Output = ()>, Self)>
  where
    SR: StreamReader,
  {
    hb.lease_mut().clear();
    let mut buffer = [0; 24];
    let _ = stream_reader.read(&mut buffer).await?;
    if &buffer != PREFACE {
      let _rslt = stream_writer
        .write_all(
          &go_away_frame::GoAwayFrame::new(Http2ErrorCode::ProtocolError, u31::U31::ZERO).bytes(),
        )
        .await;
      return Err(protocol_err(Http2Error::NoPreface));
    }
    let (is_conn_open, max_frame_len, pfb, read_frame_waker) =
      Self::manage_initial_params::<false>(hb.lease_mut(), &hp, &mut stream_writer).await?;
    let hd = HD::new(HD::Item::new(Http2Data::new(hb, hp, stream_writer)));
    let this = Self { hd: hd.clone(), is_conn_open: Arc::clone(&is_conn_open), ish_id: 0 };
    Ok((
      frame_reader::frame_reader(
        hd,
        is_conn_open,
        max_frame_len,
        pfb,
        read_frame_waker,
        stream_reader,
      ),
      this,
    ))
  }

  /// Awaits for an initial header to create a stream.
  ///
  /// Returns [`Either::Left`] if the network connection has been closed, either locally
  /// or externally.
  #[inline]
  pub async fn stream<T>(
    &mut self,
    rrb: ReqResBuffer,
    cb: impl FnOnce(Request<&mut ReqResBuffer>, Option<Protocol>) -> T,
  ) -> crate::Result<Either<ReqResBuffer, (ServerStream<HD>, T)>> {
    let Self { hd, is_conn_open, ish_id } = self;
    let curr_ish_id = *ish_id;
    *ish_id = ish_id.wrapping_add(1);
    let rrb_opt = &mut Some(rrb);
    let mut lock_pin = pin!(hd.lock());
    let rslt = poll_fn(|cx| {
      let mut guard = lock_pin!(cx, hd, lock_pin);
      let hdpm = guard.parts_mut();
      if let Some(mut this_rrb) = rrb_opt.take() {
        if !manage_initial_stream_receiving(is_conn_open, &mut this_rrb) {
          return Poll::Ready(Ok(Either::Left((
            this_rrb,
            frame_reader_rslt(hdpm.frame_reader_error),
          ))));
        }
        drop(hdpm.hb.initial_server_headers.push_back(
          curr_ish_id,
          initial_server_header::InitialServerHeader {
            method: Method::Get,
            protocol: None,
            rrb: this_rrb,
            stream_id: u31::U31::ZERO,
            waker: cx.waker().clone(),
          },
        ));
        Poll::Pending
      } else {
        let Some(ish) = hdpm.hb.initial_server_headers.remove(&curr_ish_id) else {
          return Poll::Ready(Err(protocol_err(Http2Error::UnknownInitialServerHeaderId)));
        };
        hdpm.hb.initial_server_headers.decrease_cursor();
        if !is_conn_open.load(Ordering::Relaxed) {
          let this_rrb = if ish.stream_id.is_zero() {
            ish.rrb
          } else {
            mem::take(&mut sorp_mut(&mut hdpm.hb.sorp, ish.stream_id)?.rrb)
          };
          return Poll::Ready(Ok(Either::Left((
            this_rrb,
            frame_reader_rslt(hdpm.frame_reader_error),
          ))));
        }
        Poll::Ready(Ok(Either::Right((ish.method, ish.protocol, ish.stream_id, guard))))
      }
    })
    .await;
    match rslt? {
      Either::Left(elem) => {
        elem.1?;
        Ok(Either::Left(elem.0))
      }
      Either::Right((method, protocol, stream_id, mut guard)) => {
        let sorp = sorp_mut(&mut guard.parts_mut().hb.sorp, stream_id)?;
        let elem_cb = cb(Request::http2(method, &mut sorp.rrb), protocol);
        drop(guard);
        Ok(Either::Right((
          ServerStream::new(
            hd.clone(),
            Arc::clone(is_conn_open),
            method,
            protocol,
            _trace_span!("New server stream", stream_id = %stream_id),
            stream_id,
          ),
          elem_cb,
        )))
      }
    }
  }
}

impl<HB, HD, SW> Http2<HD, true>
where
  HB: LeaseMut<Http2Buffer>,
  HD: RefCounter,
  HD::Item: Lock<Resource = Http2Data<HB, SW, true>>,
  SW: StreamWriter,
{
  /// Tries to connect to a server sending the local parameters.
  #[inline]
  pub async fn connect<SR>(
    mut hb: HB,
    mut hp: Http2Params,
    (stream_reader, mut stream_writer): (SR, SW),
  ) -> crate::Result<(impl Future<Output = ()>, Self)>
  where
    SR: StreamReader,
  {
    hb.lease_mut().clear();
    hp = hp.set_enable_connect_protocol(false);
    let (is_conn_open, max_frame_len, pfb, read_frame_waker) =
      Self::manage_initial_params::<true>(hb.lease_mut(), &hp, &mut stream_writer).await?;
    let hd = HD::new(HD::Item::new(Http2Data::new(hb, hp, stream_writer)));
    let this = Self { hd: hd.clone(), is_conn_open: Arc::clone(&is_conn_open), ish_id: 0 };
    Ok((
      frame_reader::frame_reader(
        hd,
        is_conn_open,
        max_frame_len,
        pfb,
        read_frame_waker,
        stream_reader,
      ),
      this,
    ))
  }

  /// Opens a local stream.
  #[inline]
  pub async fn stream(&mut self) -> crate::Result<ClientStream<HD>> {
    let mut guard = self.hd.lock().await;
    let hdpm = guard.parts_mut();
    if hdpm.hb.sorp.len() >= *Usize::from(hdpm.hp.max_concurrent_streams_num()) {
      drop(guard);
      let err = protocol_err(Http2Error::ExceedAmountOfActiveConcurrentStreams);
      process_higher_operation_err(&err, &self.hd).await;
      return Err(err);
    }
    let stream_id = *hdpm.last_stream_id;
    let span = _trace_span!("New client stream", stream_id = %stream_id);
    drop(hdpm.hb.scrp.insert(
      stream_id,
      stream_receiver::StreamControlRecvParams {
        is_stream_open: true,
        stream_state: stream_state::StreamState::Idle,
        waker: NOOP_WAKER.clone(),
        windows: Windows::initial(hdpm.hp, hdpm.hps),
      },
    ));
    *hdpm.last_stream_id = hdpm.last_stream_id.wrapping_add(u31::U31::TWO);
    drop(guard);
    Ok(ClientStream::new(self.hd.clone(), Arc::clone(&self.is_conn_open), span, stream_id))
  }
}

impl<HD, const IS_CLIENT: bool> Clone for Http2<HD, IS_CLIENT>
where
  HD: RefCounter,
{
  #[inline]
  fn clone(&self) -> Self {
    Self { hd: self.hd.clone(), is_conn_open: Arc::clone(&self.is_conn_open), ish_id: self.ish_id }
  }
}