rama_http_core/h2/
share.rs

1use crate::h2::codec::UserError;
2use crate::h2::frame::Reason;
3use crate::h2::proto::{self, WindowSize};
4
5use bytes::{Buf, Bytes};
6use rama_http_types::HeaderMap;
7use rama_http_types::proto::h1::headers::original::OriginalHttp1Headers;
8
9use std::fmt;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// Sends the body stream and trailers to the remote peer.
14///
15/// # Overview
16///
17/// A `SendStream` is provided by [`SendRequest`] and [`SendResponse`] once the
18/// HTTP/2 message header has been sent sent. It is used to stream the message
19/// body and send the message trailers. See method level documentation for more
20/// details.
21///
22/// The `SendStream` instance is also used to manage outbound flow control.
23///
24/// If a `SendStream` is dropped without explicitly closing the send stream, a
25/// `RST_STREAM` frame will be sent. This essentially cancels the request /
26/// response exchange.
27///
28/// The ways to explicitly close the send stream are:
29///
30/// * Set `end_of_stream` to true when calling [`send_request`],
31///   [`send_response`], or [`send_data`].
32/// * Send trailers with [`send_trailers`].
33/// * Explicitly reset the stream with [`send_reset`].
34///
35/// # Flow control
36///
37/// In HTTP/2, data cannot be sent to the remote peer unless there is
38/// available window capacity on both the stream and the connection. When a data
39/// frame is sent, both the stream window and the connection window are
40/// decremented. When the stream level window reaches zero, no further data can
41/// be sent on that stream. When the connection level window reaches zero, no
42/// further data can be sent on any stream for that connection.
43///
44/// When the remote peer is ready to receive more data, it sends `WINDOW_UPDATE`
45/// frames. These frames increment the windows. See the [specification] for more
46/// details on the principles of HTTP/2 flow control.
47///
48/// The implications for sending data are that the caller **should** ensure that
49/// both the stream and the connection has available window capacity before
50/// loading the data to send into memory. The `SendStream` instance provides the
51/// necessary APIs to perform this logic. This, however, is not an obligation.
52/// If the caller attempts to send data on a stream when there is no available
53/// window capacity, the library will buffer the data until capacity becomes
54/// available, at which point the buffer will be flushed to the connection.
55///
56/// **NOTE**: There is no bound on the amount of data that the library will
57/// buffer. If you are sending large amounts of data, you really should hook
58/// into the flow control lifecycle. Otherwise, you risk using up significant
59/// amounts of memory.
60///
61/// To hook into the flow control lifecycle, the caller signals to the library
62/// that it intends to send data by calling [`reserve_capacity`], specifying the
63/// amount of data, in octets, that the caller intends to send. After this,
64/// `poll_capacity` is used to be notified when the requested capacity is
65/// assigned to the stream. Once [`poll_capacity`] returns `Ready` with the number
66/// of octets available to the stream, the caller is able to actually send the
67/// data using [`send_data`].
68///
69/// Because there is also a connection level window that applies to **all**
70/// streams on a connection, when capacity is assigned to a stream (indicated by
71/// `poll_capacity` returning `Ready`), this capacity is reserved on the
72/// connection and will **not** be assigned to any other stream. If data is
73/// never written to the stream, that capacity is effectively lost to other
74/// streams and this introduces the risk of deadlocking a connection.
75///
76/// To avoid throttling data on a connection, the caller should not reserve
77/// capacity until ready to send data and once any capacity is assigned to the
78/// stream, the caller should immediately send data consuming this capacity.
79/// There is no guarantee as to when the full capacity requested will become
80/// available. For example, if the caller requests 64 KB of data and 512 bytes
81/// become available, the caller should immediately send 512 bytes of data.
82///
83/// See [`reserve_capacity`] documentation for more details.
84///
85/// [`SendRequest`]: client/struct.SendRequest.html
86/// [`SendResponse`]: server/struct.SendResponse.html
87/// [specification]: http://httpwg.org/specs/rfc7540.html#FlowControl
88/// [`reserve_capacity`]: #method.reserve_capacity
89/// [`poll_capacity`]: #method.poll_capacity
90/// [`send_data`]: #method.send_data
91/// [`send_request`]: client/struct.SendRequest.html#method.send_request
92/// [`send_response`]: server/struct.SendResponse.html#method.send_response
93/// [`send_data`]: #method.send_data
94/// [`send_trailers`]: #method.send_trailers
95/// [`send_reset`]: #method.send_reset
96#[derive(Debug)]
97pub struct SendStream<B> {
98    inner: proto::StreamRef<B>,
99}
100
101/// A stream identifier, as described in [Section 5.1.1] of RFC 7540.
102///
103/// Streams are identified with an unsigned 31-bit integer. Streams
104/// initiated by a client MUST use odd-numbered stream identifiers; those
105/// initiated by the server MUST use even-numbered stream identifiers.  A
106/// stream identifier of zero (0x0) is used for connection control
107/// messages; the stream identifier of zero cannot be used to establish a
108/// new stream.
109///
110/// [Section 5.1.1]: https://tools.ietf.org/html/rfc7540#section-5.1.1
111#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
112pub struct StreamId(u32);
113
114impl From<StreamId> for u32 {
115    fn from(src: StreamId) -> Self {
116        src.0
117    }
118}
119
120/// Receives the body stream and trailers from the remote peer.
121///
122/// A `RecvStream` is provided by [`client::ResponseFuture`] and
123/// [`server::Connection`] with the received HTTP/2 message head (the response
124/// and request head respectively).
125///
126/// A `RecvStream` instance is used to receive the streaming message body and
127/// any trailers from the remote peer. It is also used to manage inbound flow
128/// control.
129///
130/// See method level documentation for more details on receiving data. See
131/// [`FlowControl`] for more details on inbound flow control.
132///
133/// [`client::ResponseFuture`]: client/struct.ResponseFuture.html
134/// [`server::Connection`]: server/struct.Connection.html
135/// [`FlowControl`]: struct.FlowControl.html
136/// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
137#[must_use = "streams do nothing unless polled"]
138pub struct RecvStream {
139    inner: FlowControl,
140}
141
142/// A handle to release window capacity to a remote stream.
143///
144/// This type allows the caller to manage inbound data [flow control]. The
145/// caller is expected to call [`release_capacity`] after dropping data frames.
146///
147/// # Overview
148///
149/// Each stream has a window size. This window size is the maximum amount of
150/// inbound data that can be in-flight. In-flight data is defined as data that
151/// has been received, but not yet released.
152///
153/// When a stream is created, the window size is set to the connection's initial
154/// window size value. When a data frame is received, the window size is then
155/// decremented by size of the data frame before the data is provided to the
156/// caller. As the caller finishes using the data, [`release_capacity`] must be
157/// called. This will then increment the window size again, allowing the peer to
158/// send more data.
159///
160/// There is also a connection level window as well as the stream level window.
161/// Received data counts against the connection level window as well and calls
162/// to [`release_capacity`] will also increment the connection level window.
163///
164/// # Sending `WINDOW_UPDATE` frames
165///
166/// `WINDOW_UPDATE` frames will not be sent out for **every** call to
167/// `release_capacity`, as this would end up slowing down the protocol. Instead,
168/// `h2` waits until the window size is increased to a certain threshold and
169/// then sends out a single `WINDOW_UPDATE` frame representing all the calls to
170/// `release_capacity` since the last `WINDOW_UPDATE` frame.
171///
172/// This essentially batches window updating.
173///
174/// # Scenarios
175///
176/// Following is a basic scenario with an HTTP/2 connection containing a
177/// single active stream.
178///
179/// * A new stream is activated. The receive window is initialized to 1024 (the
180///   value of the initial window size for this connection).
181/// * A `DATA` frame is received containing a payload of 600 bytes.
182/// * The receive window size is reduced to 424 bytes.
183/// * [`release_capacity`] is called with 200.
184/// * The receive window size is now 624 bytes. The peer may send no more than
185///   this.
186/// * A `DATA` frame is received with a payload of 624 bytes.
187/// * The window size is now 0 bytes. The peer may not send any more data.
188/// * [`release_capacity`] is called with 1024.
189/// * The receive window size is now 1024 bytes. The peer may now send more
190///   data.
191///
192/// [flow control]: ../index.html#flow-control
193/// [`release_capacity`]: struct.FlowControl.html#method.release_capacity
194#[derive(Clone, Debug)]
195pub struct FlowControl {
196    inner: proto::OpaqueStreamRef,
197}
198
199/// A handle to send and receive PING frames with the peer.
200// NOT Clone on purpose
201pub struct PingPong {
202    inner: proto::UserPings,
203}
204
205/// Sent via [`PingPong`][] to send a PING frame to a peer.
206///
207/// [`PingPong`]: struct.PingPong.html
208pub struct Ping {
209    _p: (),
210}
211
212/// Received via [`PingPong`][] when a peer acknowledges a [`Ping`][].
213///
214/// [`PingPong`]: struct.PingPong.html
215/// [`Ping`]: struct.Ping.html
216pub struct Pong {
217    _p: (),
218}
219
220// ===== impl SendStream =====
221
222impl<B: Buf> SendStream<B> {
223    pub(crate) fn new(inner: proto::StreamRef<B>) -> Self {
224        SendStream { inner }
225    }
226
227    /// Requests capacity to send data.
228    ///
229    /// This function is used to express intent to send data. This requests
230    /// connection level capacity. Once the capacity is available, it is
231    /// assigned to the stream and not reused by other streams.
232    ///
233    /// This function may be called repeatedly. The `capacity` argument is the
234    /// **total** amount of requested capacity. Sequential calls to
235    /// `reserve_capacity` are *not* additive. Given the following:
236    ///
237    /// ```rust
238    /// # use rama_http_core::h2::*;
239    /// # fn doc(mut send_stream: SendStream<&'static [u8]>) {
240    /// send_stream.reserve_capacity(100);
241    /// send_stream.reserve_capacity(200);
242    /// # }
243    /// ```
244    ///
245    /// After the second call to `reserve_capacity`, the *total* requested
246    /// capacity will be 200.
247    ///
248    /// `reserve_capacity` is also used to cancel previous capacity requests.
249    /// Given the following:
250    ///
251    /// ```rust
252    /// # use rama_http_core::h2::*;
253    /// # fn doc(mut send_stream: SendStream<&'static [u8]>) {
254    /// send_stream.reserve_capacity(100);
255    /// send_stream.reserve_capacity(0);
256    /// # }
257    /// ```
258    ///
259    /// After the second call to `reserve_capacity`, the *total* requested
260    /// capacity will be 0, i.e. there is no requested capacity for the stream.
261    ///
262    /// If `reserve_capacity` is called with a lower value than the amount of
263    /// capacity **currently** assigned to the stream, this capacity will be
264    /// returned to the connection to be re-assigned to other streams.
265    ///
266    /// Also, the amount of capacity that is reserved gets decremented as data
267    /// is sent. For example:
268    ///
269    /// ```rust
270    /// # use rama_http_core::h2::*;
271    /// # async fn doc(mut send_stream: SendStream<&'static [u8]>) {
272    /// send_stream.reserve_capacity(100);
273    ///
274    /// send_stream.send_data(b"hello", false).unwrap();
275    /// // At this point, the total amount of requested capacity is 95 bytes.
276    ///
277    /// // Calling `reserve_capacity` with `100` again essentially requests an
278    /// // additional 5 bytes.
279    /// send_stream.reserve_capacity(100);
280    /// # }
281    /// ```
282    ///
283    /// See [Flow control](struct.SendStream.html#flow-control) for an overview
284    /// of how send flow control works.
285    pub fn reserve_capacity(&mut self, capacity: usize) {
286        // TODO: Check for overflow
287        self.inner.reserve_capacity(capacity as WindowSize)
288    }
289
290    /// Returns the stream's current send capacity.
291    ///
292    /// This allows the caller to check the current amount of available capacity
293    /// before sending data.
294    pub fn capacity(&self) -> usize {
295        self.inner.capacity() as usize
296    }
297
298    /// Requests to be notified when the stream's capacity increases.
299    ///
300    /// Before calling this, capacity should be requested with
301    /// `reserve_capacity`. Once capacity is requested, the connection will
302    /// assign capacity to the stream **as it becomes available**. There is no
303    /// guarantee as to when and in what increments capacity gets assigned to
304    /// the stream.
305    ///
306    /// To get notified when the available capacity increases, the caller calls
307    /// `poll_capacity`, which returns `Ready(Some(n))` when `n` has been
308    /// increased by the connection. Note that `n` here represents the **total**
309    /// amount of assigned capacity at that point in time. It is also possible
310    /// that `n` is lower than the previous call if, since then, the caller has
311    /// sent data.
312    pub fn poll_capacity(
313        &mut self,
314        cx: &mut Context,
315    ) -> Poll<Option<Result<usize, crate::h2::Error>>> {
316        self.inner
317            .poll_capacity(cx)
318            .map_ok(|w| w as usize)
319            .map_err(Into::into)
320    }
321
322    /// Sends a single data frame to the remote peer.
323    ///
324    /// This function may be called repeatedly as long as `end_of_stream` is set
325    /// to `false`. Setting `end_of_stream` to `true` sets the end stream flag
326    /// on the data frame. Any further calls to `send_data` or `send_trailers`
327    /// will return an [`Error`].
328    ///
329    /// `send_data` can be called without reserving capacity. In this case, the
330    /// data is buffered and the capacity is implicitly requested. Once the
331    /// capacity becomes available, the data is flushed to the connection.
332    /// However, this buffering is unbounded. As such, sending large amounts of
333    /// data without reserving capacity before hand could result in large
334    /// amounts of data being buffered in memory.
335    ///
336    /// [`Error`]: struct.Error.html
337    pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), crate::h2::Error> {
338        self.inner
339            .send_data(data, end_of_stream)
340            .map_err(Into::into)
341    }
342
343    /// Sends trailers to the remote peer.
344    ///
345    /// Sending trailers implicitly closes the send stream. Once the send stream
346    /// is closed, no more data can be sent.
347    pub fn send_trailers(
348        &mut self,
349        trailers: HeaderMap,
350        trailer_order: OriginalHttp1Headers,
351    ) -> Result<(), crate::h2::Error> {
352        self.inner
353            .send_trailers(trailers, trailer_order)
354            .map_err(Into::into)
355    }
356
357    /// Resets the stream.
358    ///
359    /// This cancels the request / response exchange. If the response has not
360    /// yet been received, the associated `ResponseFuture` will return an
361    /// [`Error`] to reflect the canceled exchange.
362    ///
363    /// [`Error`]: struct.Error.html
364    pub fn send_reset(&mut self, reason: Reason) {
365        self.inner.send_reset(reason)
366    }
367
368    /// Polls to be notified when the client resets this stream.
369    ///
370    /// If stream is still open, this returns `Poll::Pending`, and
371    /// registers the task to be notified if a `RST_STREAM` is received.
372    ///
373    /// If a `RST_STREAM` frame is received for this stream, calling this
374    /// method will yield the `Reason` for the reset.
375    ///
376    /// # Error
377    ///
378    /// If connection sees an error, this returns that error instead of a
379    /// `Reason`.
380    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::h2::Error>> {
381        self.inner.poll_reset(cx, proto::PollReset::Streaming)
382    }
383
384    /// Returns the stream ID of this `SendStream`.
385    ///
386    /// # Panics
387    ///
388    /// If the lock on the stream store has been poisoned.
389    pub fn stream_id(&self) -> StreamId {
390        StreamId::from_internal(self.inner.stream_id())
391    }
392}
393
394// ===== impl StreamId =====
395
396impl StreamId {
397    pub(crate) fn from_internal(id: crate::h2::frame::StreamId) -> Self {
398        StreamId(id.into())
399    }
400
401    /// Returns the `u32` corresponding to this `StreamId`
402    ///
403    /// # Note
404    ///
405    /// This is the same as the `From<StreamId>` implementation, but
406    /// included as an inherent method because that implementation doesn't
407    /// appear in rustdocs, as well as a way to force the type instead of
408    /// relying on inference.
409    pub fn as_u32(&self) -> u32 {
410        (*self).into()
411    }
412}
413// ===== impl RecvStream =====
414
415impl RecvStream {
416    pub(crate) fn new(inner: FlowControl) -> Self {
417        RecvStream { inner }
418    }
419
420    /// Get the next data frame.
421    pub async fn data(&mut self) -> Option<Result<Bytes, crate::h2::Error>> {
422        crate::h2::poll_fn(move |cx| self.poll_data(cx)).await
423    }
424
425    /// Get optional trailers for this stream.
426    pub async fn trailers(&mut self) -> Result<Option<HeaderMap>, crate::h2::Error> {
427        crate::h2::poll_fn(move |cx| self.poll_trailers(cx)).await
428    }
429
430    /// Poll for the next data frame.
431    pub fn poll_data(
432        &mut self,
433        cx: &mut Context<'_>,
434    ) -> Poll<Option<Result<Bytes, crate::h2::Error>>> {
435        self.inner.inner.poll_data(cx).map_err(Into::into)
436    }
437
438    #[doc(hidden)]
439    pub fn poll_trailers(
440        &mut self,
441        cx: &mut Context,
442    ) -> Poll<Result<Option<HeaderMap>, crate::h2::Error>> {
443        match ready!(self.inner.inner.poll_trailers(cx)) {
444            Some(Ok(map)) => Poll::Ready(Ok(Some(map))),
445            Some(Err(e)) => Poll::Ready(Err(e.into())),
446            None => Poll::Ready(Ok(None)),
447        }
448    }
449
450    /// Returns true if the receive half has reached the end of stream.
451    ///
452    /// A return value of `true` means that calls to `poll` and `poll_trailers`
453    /// will both return `None`.
454    pub fn is_end_stream(&self) -> bool {
455        self.inner.inner.is_end_stream()
456    }
457
458    /// Get a mutable reference to this stream's `FlowControl`.
459    ///
460    /// It can be used immediately, or cloned to be used later.
461    pub fn flow_control(&mut self) -> &mut FlowControl {
462        &mut self.inner
463    }
464
465    /// Returns the stream ID of this stream.
466    ///
467    /// # Panics
468    ///
469    /// If the lock on the stream store has been poisoned.
470    pub fn stream_id(&self) -> StreamId {
471        self.inner.stream_id()
472    }
473}
474
475impl futures_core::Stream for RecvStream {
476    type Item = Result<Bytes, crate::h2::Error>;
477
478    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
479        self.poll_data(cx)
480    }
481}
482
483impl fmt::Debug for RecvStream {
484    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
485        fmt.debug_struct("RecvStream")
486            .field("inner", &self.inner)
487            .finish()
488    }
489}
490
491impl Drop for RecvStream {
492    fn drop(&mut self) {
493        // Eagerly clear any received DATA frames now, since its no longer
494        // possible to retrieve them. However, this will be called
495        // again once *all* stream refs have been dropped, since
496        // this won't send a RST_STREAM frame, in case the user wishes to
497        // still *send* DATA.
498        self.inner.inner.clear_recv_buffer();
499    }
500}
501
502// ===== impl FlowControl =====
503
504impl FlowControl {
505    pub(crate) fn new(inner: proto::OpaqueStreamRef) -> Self {
506        FlowControl { inner }
507    }
508
509    /// Returns the stream ID of the stream whose capacity will
510    /// be released by this `FlowControl`.
511    pub fn stream_id(&self) -> StreamId {
512        StreamId::from_internal(self.inner.stream_id())
513    }
514
515    /// Get the current available capacity of data this stream *could* receive.
516    pub fn available_capacity(&self) -> isize {
517        self.inner.available_recv_capacity()
518    }
519
520    /// Get the currently *used* capacity for this stream.
521    ///
522    /// This is the amount of bytes that can be released back to the remote.
523    pub fn used_capacity(&self) -> usize {
524        self.inner.used_recv_capacity() as usize
525    }
526
527    /// Release window capacity back to remote stream.
528    ///
529    /// This releases capacity back to the stream level and the connection level
530    /// windows. Both window sizes will be increased by `sz`.
531    ///
532    /// See [struct level] documentation for more details.
533    ///
534    /// # Errors
535    ///
536    /// This function errors if increasing the receive window size by `sz` would
537    /// result in a window size greater than the target window size. In other
538    /// words, the caller cannot release more capacity than data has been
539    /// received. If 1024 bytes of data have been received, at most 1024 bytes
540    /// can be released.
541    ///
542    /// [struct level]: #
543    pub fn release_capacity(&mut self, sz: usize) -> Result<(), crate::h2::Error> {
544        if sz > proto::MAX_WINDOW_SIZE as usize {
545            return Err(UserError::ReleaseCapacityTooBig.into());
546        }
547        self.inner
548            .release_capacity(sz as proto::WindowSize)
549            .map_err(Into::into)
550    }
551}
552
553// ===== impl PingPong =====
554
555impl PingPong {
556    pub(crate) fn new(inner: proto::UserPings) -> Self {
557        PingPong { inner }
558    }
559
560    /// Send a PING frame and wait for the peer to send the pong.
561    pub async fn ping(&mut self, ping: Ping) -> Result<Pong, crate::h2::Error> {
562        self.send_ping(ping)?;
563        crate::h2::poll_fn(|cx| self.poll_pong(cx)).await
564    }
565
566    #[doc(hidden)]
567    pub fn send_ping(&mut self, ping: Ping) -> Result<(), crate::h2::Error> {
568        // Passing a `Ping` here is just to be forwards-compatible with
569        // eventually allowing choosing a ping payload. For now, we can
570        // just ignore it.
571        let _ = ping;
572
573        self.inner.send_ping().map_err(|err| match err {
574            Some(err) => err.into(),
575            None => UserError::SendPingWhilePending.into(),
576        })
577    }
578
579    #[doc(hidden)]
580    pub fn poll_pong(&mut self, cx: &mut Context) -> Poll<Result<Pong, crate::h2::Error>> {
581        ready!(self.inner.poll_pong(cx))?;
582        Poll::Ready(Ok(Pong { _p: () }))
583    }
584}
585
586impl fmt::Debug for PingPong {
587    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
588        fmt.debug_struct("PingPong").finish()
589    }
590}
591
592// ===== impl Ping =====
593
594impl Ping {
595    /// Creates a new opaque `Ping` to be sent via a [`PingPong`][].
596    ///
597    /// The payload is "opaque", such that it shouldn't be depended on.
598    ///
599    /// [`PingPong`]: struct.PingPong.html
600    pub fn opaque() -> Ping {
601        Ping { _p: () }
602    }
603}
604
605impl fmt::Debug for Ping {
606    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
607        fmt.debug_struct("Ping").finish()
608    }
609}
610
611// ===== impl Pong =====
612
613impl fmt::Debug for Pong {
614    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
615        fmt.debug_struct("Pong").finish()
616    }
617}