monoio_http/h2/share.rs
1#[cfg(feature = "stream")]
2use std::pin::Pin;
3use std::{
4 fmt,
5 task::{Context, Poll},
6};
7
8use bytes::{Buf, Bytes};
9use http::HeaderMap;
10
11use crate::{
12 common::body::Body,
13 h2::{
14 codec::UserError,
15 frame::Reason,
16 proto::{self, WindowSize},
17 },
18};
19
20/// Sends the body stream and trailers to the remote peer.
21///
22/// # Overview
23///
24/// A `SendStream` is provided by [`SendRequest`] and [`SendResponse`] once the
25/// HTTP/2 message header has been sent sent. It is used to stream the message
26/// body and send the message trailers. See method level documentation for more
27/// details.
28///
29/// The `SendStream` instance is also used to manage outbound flow control.
30///
31/// If a `SendStream` is dropped without explicitly closing the send stream, a
32/// `RST_STREAM` frame will be sent. This essentially cancels the request /
33/// response exchange.
34///
35/// The ways to explicitly close the send stream are:
36///
37/// * Set `end_of_stream` to true when calling [`send_request`], [`send_response`], or
38/// [`send_data`].
39/// * Send trailers with [`send_trailers`].
40/// * Explicitly reset the stream with [`send_reset`].
41///
42/// # Flow control
43///
44/// In HTTP/2, data cannot be sent to the remote peer unless there is
45/// available window capacity on both the stream and the connection. When a data
46/// frame is sent, both the stream window and the connection window are
47/// decremented. When the stream level window reaches zero, no further data can
48/// be sent on that stream. When the connection level window reaches zero, no
49/// further data can be sent on any stream for that connection.
50///
51/// When the remote peer is ready to receive more data, it sends `WINDOW_UPDATE`
52/// frames. These frames increment the windows. See the [specification] for more
53/// details on the principles of HTTP/2 flow control.
54///
55/// The implications for sending data are that the caller **should** ensure that
56/// both the stream and the connection has available window capacity before
57/// loading the data to send into memory. The `SendStream` instance provides the
58/// necessary APIs to perform this logic. This, however, is not an obligation.
59/// If the caller attempts to send data on a stream when there is no available
60/// window capacity, the library will buffer the data until capacity becomes
61/// available, at which point the buffer will be flushed to the connection.
62///
63/// **NOTE**: There is no bound on the amount of data that the library will
64/// buffer. If you are sending large amounts of data, you really should hook
65/// into the flow control lifecycle. Otherwise, you risk using up significant
66/// amounts of memory.
67///
68/// To hook into the flow control lifecycle, the caller signals to the library
69/// that it intends to send data by calling [`reserve_capacity`], specifying the
70/// amount of data, in octets, that the caller intends to send. After this,
71/// `poll_capacity` is used to be notified when the requested capacity is
72/// assigned to the stream. Once [`poll_capacity`] returns `Ready` with the number
73/// of octets available to the stream, the caller is able to actually send the
74/// data using [`send_data`].
75///
76/// Because there is also a connection level window that applies to **all**
77/// streams on a connection, when capacity is assigned to a stream (indicated by
78/// `poll_capacity` returning `Ready`), this capacity is reserved on the
79/// connection and will **not** be assigned to any other stream. If data is
80/// never written to the stream, that capacity is effectively lost to other
81/// streams and this introduces the risk of deadlocking a connection.
82///
83/// To avoid throttling data on a connection, the caller should not reserve
84/// capacity until ready to send data and once any capacity is assigned to the
85/// stream, the caller should immediately send data consuming this capacity.
86/// There is no guarantee as to when the full capacity requested will become
87/// available. For example, if the caller requests 64 KB of data and 512 bytes
88/// become available, the caller should immediately send 512 bytes of data.
89///
90/// See [`reserve_capacity`] documentation for more details.
91///
92/// [`SendRequest`]: client/struct.SendRequest.html
93/// [`SendResponse`]: server/struct.SendResponse.html
94/// [specification]: http://httpwg.org/specs/rfc7540.html#FlowControl
95/// [`reserve_capacity`]: #method.reserve_capacity
96/// [`poll_capacity`]: #method.poll_capacity
97/// [`send_data`]: #method.send_data
98/// [`send_request`]: client/struct.SendRequest.html#method.send_request
99/// [`send_response`]: server/struct.SendResponse.html#method.send_response
100/// [`send_data`]: #method.send_data
101/// [`send_trailers`]: #method.send_trailers
102/// [`send_reset`]: #method.send_reset
103#[derive(Debug)]
104pub struct SendStream<B> {
105 inner: proto::StreamRef<B>,
106}
107
108/// A stream identifier, as described in [Section 5.1.1] of RFC 7540.
109///
110/// Streams are identified with an unsigned 31-bit integer. Streams
111/// initiated by a client MUST use odd-numbered stream identifiers; those
112/// initiated by the server MUST use even-numbered stream identifiers. A
113/// stream identifier of zero (0x0) is used for connection control
114/// messages; the stream identifier of zero cannot be used to establish a
115/// new stream.
116///
117/// [Section 5.1.1]: https://tools.ietf.org/html/rfc7540#section-5.1.1
118#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
119pub struct StreamId(u32);
120
121impl From<StreamId> for u32 {
122 fn from(src: StreamId) -> Self {
123 src.0
124 }
125}
126
127/// Receives the body stream and trailers from the remote peer.
128///
129/// A `RecvStream` is provided by [`client::ResponseFuture`] and
130/// [`server::Connection`] with the received HTTP/2 message head (the response
131/// and request head respectively).
132///
133/// A `RecvStream` instance is used to receive the streaming message body and
134/// any trailers from the remote peer. It is also used to manage inbound flow
135/// control.
136///
137/// See method level documentation for more details on receiving data. See
138/// [`FlowControl`] for more details on inbound flow control.
139///
140/// [`client::ResponseFuture`]: client/struct.ResponseFuture.html
141/// [`server::Connection`]: server/struct.Connection.html
142/// [`FlowControl`]: struct.FlowControl.html
143/// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
144#[must_use = "streams do nothing unless polled"]
145#[derive(Clone)]
146pub struct RecvStream {
147 inner: FlowControl,
148}
149
150/// A handle to release window capacity to a remote stream.
151///
152/// This type allows the caller to manage inbound data [flow control]. The
153/// caller is expected to call [`release_capacity`] after dropping data frames.
154///
155/// # Overview
156///
157/// Each stream has a window size. This window size is the maximum amount of
158/// inbound data that can be in-flight. In-flight data is defined as data that
159/// has been received, but not yet released.
160///
161/// When a stream is created, the window size is set to the connection's initial
162/// window size value. When a data frame is received, the window size is then
163/// decremented by size of the data frame before the data is provided to the
164/// caller. As the caller finishes using the data, [`release_capacity`] must be
165/// called. This will then increment the window size again, allowing the peer to
166/// send more data.
167///
168/// There is also a connection level window as well as the stream level window.
169/// Received data counts against the connection level window as well and calls
170/// to [`release_capacity`] will also increment the connection level window.
171///
172/// # Sending `WINDOW_UPDATE` frames
173///
174/// `WINDOW_UPDATE` frames will not be sent out for **every** call to
175/// `release_capacity`, as this would end up slowing down the protocol. Instead,
176/// `h2` waits until the window size is increased to a certain threshold and
177/// then sends out a single `WINDOW_UPDATE` frame representing all the calls to
178/// `release_capacity` since the last `WINDOW_UPDATE` frame.
179///
180/// This essentially batches window updating.
181///
182/// # Scenarios
183///
184/// Following is a basic scenario with an HTTP/2 connection containing a
185/// single active stream.
186///
187/// * A new stream is activated. The receive window is initialized to 1024 (the value of the initial
188/// window size for this connection).
189/// * A `DATA` frame is received containing a payload of 600 bytes.
190/// * The receive window size is reduced to 424 bytes.
191/// * [`release_capacity`] is called with 200.
192/// * The receive window size is now 624 bytes. The peer may send no more than this.
193/// * A `DATA` frame is received with a payload of 624 bytes.
194/// * The window size is now 0 bytes. The peer may not send any more data.
195/// * [`release_capacity`] is called with 1024.
196/// * The receive window size is now 1024 bytes. The peer may now send more
197/// data.
198///
199/// [flow control]: ../index.html#flow-control
200/// [`release_capacity`]: struct.FlowControl.html#method.release_capacity
201#[derive(Clone, Debug)]
202pub struct FlowControl {
203 inner: proto::OpaqueStreamRef,
204}
205
206/// A handle to send and receive PING frames with the peer.
207// NOT Clone on purpose
208pub struct PingPong {
209 inner: proto::UserPings,
210}
211
212/// Sent via [`PingPong`][] to send a PING frame to a peer.
213///
214/// [`PingPong`]: struct.PingPong.html
215pub struct Ping {
216 _p: (),
217}
218
219/// Received via [`PingPong`][] when a peer acknowledges a [`Ping`][].
220///
221/// [`PingPong`]: struct.PingPong.html
222/// [`Ping`]: struct.Ping.html
223pub struct Pong {
224 _p: (),
225}
226
227// ===== impl SendStream =====
228
229impl<B: Buf> SendStream<B> {
230 pub(crate) fn new(inner: proto::StreamRef<B>) -> Self {
231 SendStream { inner }
232 }
233
234 /// Requests capacity to send data.
235 ///
236 /// This function is used to express intent to send data. This requests
237 /// connection level capacity. Once the capacity is available, it is
238 /// assigned to the stream and not reused by other streams.
239 ///
240 /// This function may be called repeatedly. The `capacity` argument is the
241 /// **total** amount of requested capacity. Sequential calls to
242 /// `reserve_capacity` are *not* additive. Given the following:
243 ///
244 /// ```rust
245 /// # use h2::*;
246 /// # fn doc(mut send_stream: SendStream<&'static [u8]>) {
247 /// send_stream.reserve_capacity(100);
248 /// send_stream.reserve_capacity(200);
249 /// # }
250 /// ```
251 ///
252 /// After the second call to `reserve_capacity`, the *total* requested
253 /// capacity will be 200.
254 ///
255 /// `reserve_capacity` is also used to cancel previous capacity requests.
256 /// Given the following:
257 ///
258 /// ```rust
259 /// # use h2::*;
260 /// # fn doc(mut send_stream: SendStream<&'static [u8]>) {
261 /// send_stream.reserve_capacity(100);
262 /// send_stream.reserve_capacity(0);
263 /// # }
264 /// ```
265 ///
266 /// After the second call to `reserve_capacity`, the *total* requested
267 /// capacity will be 0, i.e. there is no requested capacity for the stream.
268 ///
269 /// If `reserve_capacity` is called with a lower value than the amount of
270 /// capacity **currently** assigned to the stream, this capacity will be
271 /// returned to the connection to be re-assigned to other streams.
272 ///
273 /// Also, the amount of capacity that is reserved gets decremented as data
274 /// is sent. For example:
275 ///
276 /// ```rust
277 /// # use h2::*;
278 /// # async fn doc(mut send_stream: SendStream<&'static [u8]>) {
279 /// send_stream.reserve_capacity(100);
280 ///
281 /// send_stream.send_data(b"hello", false).unwrap();
282 /// // At this point, the total amount of requested capacity is 95 bytes.
283 ///
284 /// // Calling `reserve_capacity` with `100` again essentially requests an
285 /// // additional 5 bytes.
286 /// send_stream.reserve_capacity(100);
287 /// # }
288 /// ```
289 ///
290 /// See [Flow control](struct.SendStream.html#flow-control) for an overview
291 /// of how send flow control works.
292 pub fn reserve_capacity(&mut self, capacity: usize) {
293 // TODO: Check for overflow
294 self.inner.reserve_capacity(capacity as WindowSize)
295 }
296
297 /// Returns the stream's current send capacity.
298 ///
299 /// This allows the caller to check the current amount of available capacity
300 /// before sending data.
301 pub fn capacity(&self) -> usize {
302 self.inner.capacity() as usize
303 }
304
305 /// Requests to be notified when the stream's capacity increases.
306 ///
307 /// Before calling this, capacity should be requested with
308 /// `reserve_capacity`. Once capacity is requested, the connection will
309 /// assign capacity to the stream **as it becomes available**. There is no
310 /// guarantee as to when and in what increments capacity gets assigned to
311 /// the stream.
312 ///
313 /// To get notified when the available capacity increases, the caller calls
314 /// `poll_capacity`, which returns `Ready(Some(n))` when `n` has been
315 /// increased by the connection. Note that `n` here represents the **total**
316 /// amount of assigned capacity at that point in time. It is also possible
317 /// that `n` is lower than the previous call if, since then, the caller has
318 /// sent data.
319 pub fn poll_capacity(
320 &mut self,
321 cx: &mut Context,
322 ) -> Poll<Option<Result<usize, crate::h2::Error>>> {
323 self.inner
324 .poll_capacity(cx)
325 .map_ok(|w| w as usize)
326 .map_err(Into::into)
327 }
328
329 /// Sends a single data frame to the remote peer.
330 ///
331 /// This function may be called repeatedly as long as `end_of_stream` is set
332 /// to `false`. Setting `end_of_stream` to `true` sets the end stream flag
333 /// on the data frame. Any further calls to `send_data` or `send_trailers`
334 /// will return an [`Error`].
335 ///
336 /// `send_data` can be called without reserving capacity. In this case, the
337 /// data is buffered and the capacity is implicitly requested. Once the
338 /// capacity becomes available, the data is flushed to the connection.
339 /// However, this buffering is unbounded. As such, sending large amounts of
340 /// data without reserving capacity before hand could result in large
341 /// amounts of data being buffered in memory.
342 ///
343 /// [`Error`]: struct.Error.html
344 pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), crate::h2::Error> {
345 self.inner
346 .send_data(data, end_of_stream)
347 .map_err(Into::into)
348 }
349
350 /// Sends trailers to the remote peer.
351 ///
352 /// Sending trailers implicitly closes the send stream. Once the send stream
353 /// is closed, no more data can be sent.
354 pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), crate::h2::Error> {
355 self.inner.send_trailers(trailers).map_err(Into::into)
356 }
357
358 /// Resets the stream.
359 ///
360 /// This cancels the request / response exchange. If the response has not
361 /// yet been received, the associated `ResponseFuture` will return an
362 /// [`Error`] to reflect the canceled exchange.
363 ///
364 /// [`Error`]: struct.Error.html
365 pub fn send_reset(&mut self, reason: Reason) {
366 self.inner.send_reset(reason)
367 }
368
369 /// Polls to be notified when the client resets this stream.
370 ///
371 /// If stream is still open, this returns `Poll::Pending`, and
372 /// registers the task to be notified if a `RST_STREAM` is received.
373 ///
374 /// If a `RST_STREAM` frame is received for this stream, calling this
375 /// method will yield the `Reason` for the reset.
376 ///
377 /// # Error
378 ///
379 /// If connection sees an error, this returns that error instead of a
380 /// `Reason`.
381 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::h2::Error>> {
382 self.inner.poll_reset(cx, proto::PollReset::Streaming)
383 }
384
385 /// Returns the stream ID of this `SendStream`.
386 ///
387 /// # Panics
388 ///
389 /// If the lock on the stream store has been poisoned.
390 pub fn stream_id(&self) -> StreamId {
391 StreamId::from_internal(self.inner.stream_id())
392 }
393}
394
395// ===== impl StreamId =====
396
397impl StreamId {
398 pub(crate) fn from_internal(id: crate::h2::frame::StreamId) -> Self {
399 StreamId(id.into())
400 }
401
402 /// Returns the `u32` corresponding to this `StreamId`
403 ///
404 /// # Note
405 ///
406 /// This is the same as the `From<StreamId>` implementation, but
407 /// included as an inherent method because that implementation doesn't
408 /// appear in rustdocs, as well as a way to force the type instead of
409 /// relying on inference.
410 pub fn as_u32(&self) -> u32 {
411 (*self).into()
412 }
413}
414// ===== impl RecvStream =====
415
416impl RecvStream {
417 pub(crate) fn new(inner: FlowControl) -> Self {
418 RecvStream { inner }
419 }
420
421 /// Get the next data frame.
422 pub async fn data(&mut self) -> Option<Result<Bytes, crate::h2::Error>> {
423 std::future::poll_fn(move |cx| self.poll_data(cx)).await
424 }
425
426 /// Get optional trailers for this stream.
427 pub async fn trailers(&mut self) -> Result<Option<HeaderMap>, crate::h2::Error> {
428 std::future::poll_fn(move |cx| self.poll_trailers(cx)).await
429 }
430
431 /// Poll for the next data frame.
432 pub fn poll_data(&mut self, cx: &Context<'_>) -> Poll<Option<Result<Bytes, crate::h2::Error>>> {
433 self.inner.inner.poll_data(cx).map_err(Into::into)
434 }
435
436 #[doc(hidden)]
437 pub fn poll_trailers(
438 &mut self,
439 cx: &mut Context,
440 ) -> Poll<Result<Option<HeaderMap>, crate::h2::Error>> {
441 match ready!(self.inner.inner.poll_trailers(cx)) {
442 Some(Ok(map)) => Poll::Ready(Ok(Some(map))),
443 Some(Err(e)) => Poll::Ready(Err(e.into())),
444 None => Poll::Ready(Ok(None)),
445 }
446 }
447
448 /// Returns true if the receive half has reached the end of stream.
449 ///
450 /// A return value of `true` means that calls to `poll` and `poll_trailers`
451 /// will both return `None`.
452 pub fn is_end_stream(&self) -> bool {
453 self.inner.inner.is_end_stream()
454 }
455
456 /// Get a mutable reference to this stream's `FlowControl`.
457 ///
458 /// It can be used immediately, or cloned to be used later.
459 pub fn flow_control(&mut self) -> &mut FlowControl {
460 &mut self.inner
461 }
462
463 /// Returns the stream ID of this stream.
464 ///
465 /// # Panics
466 ///
467 /// If the lock on the stream store has been poisoned.
468 pub fn stream_id(&self) -> StreamId {
469 self.inner.stream_id()
470 }
471}
472
473impl Body for RecvStream {
474 type Data = Bytes;
475 type Error = crate::h2::Error;
476
477 async fn next_data(&mut self) -> Option<Result<Self::Data, Self::Error>> {
478 self.data().await.map(|res| {
479 res.and_then(|bytes| {
480 self.flow_control()
481 .release_capacity(bytes.len())
482 .map(|_| bytes)
483 })
484 })
485 }
486
487 fn stream_hint(&self) -> crate::common::body::StreamHint {
488 crate::common::body::StreamHint::Stream
489 }
490}
491
492#[cfg(feature = "stream")]
493impl futures_core::Stream for RecvStream {
494 type Item = Result<Bytes, crate::h2::Error>;
495
496 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
497 self.poll_data(cx)
498 }
499}
500
501impl fmt::Debug for RecvStream {
502 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
503 fmt.debug_struct("RecvStream")
504 .field("inner", &self.inner)
505 .finish()
506 }
507}
508
509impl Drop for RecvStream {
510 fn drop(&mut self) {
511 // Eagerly clear any received DATA frames now, since its no longer
512 // possible to retrieve them. However, this will be called
513 // again once *all* stream refs have been dropped, since
514 // this won't send a RST_STREAM frame, in case the user wishes to
515 // still *send* DATA.
516 self.inner.inner.clear_recv_buffer();
517 }
518}
519
520// ===== impl FlowControl =====
521
522impl FlowControl {
523 pub(crate) fn new(inner: proto::OpaqueStreamRef) -> Self {
524 FlowControl { inner }
525 }
526
527 /// Returns the stream ID of the stream whose capacity will
528 /// be released by this `FlowControl`.
529 pub fn stream_id(&self) -> StreamId {
530 StreamId::from_internal(self.inner.stream_id())
531 }
532
533 /// Get the current available capacity of data this stream *could* receive.
534 pub fn available_capacity(&self) -> isize {
535 self.inner.available_recv_capacity()
536 }
537
538 /// Get the currently *used* capacity for this stream.
539 ///
540 /// This is the amount of bytes that can be released back to the remote.
541 pub fn used_capacity(&self) -> usize {
542 self.inner.used_recv_capacity() as usize
543 }
544
545 /// Release window capacity back to remote stream.
546 ///
547 /// This releases capacity back to the stream level and the connection level
548 /// windows. Both window sizes will be increased by `sz`.
549 ///
550 /// See [struct level] documentation for more details.
551 ///
552 /// # Errors
553 ///
554 /// This function errors if increasing the receive window size by `sz` would
555 /// result in a window size greater than the target window size. In other
556 /// words, the caller cannot release more capacity than data has been
557 /// received. If 1024 bytes of data have been received, at most 1024 bytes
558 /// can be released.
559 ///
560 /// [struct level]: #
561 pub fn release_capacity(&mut self, sz: usize) -> Result<(), crate::h2::Error> {
562 if sz > proto::MAX_WINDOW_SIZE as usize {
563 return Err(UserError::ReleaseCapacityTooBig.into());
564 }
565 self.inner
566 .release_capacity(sz as proto::WindowSize)
567 .map_err(Into::into)
568 }
569}
570
571// ===== impl PingPong =====
572
573impl PingPong {
574 pub(crate) fn new(inner: proto::UserPings) -> Self {
575 PingPong { inner }
576 }
577
578 /// Send a PING frame and wait for the peer to send the pong.
579 pub async fn ping(&mut self, ping: Ping) -> Result<Pong, crate::h2::Error> {
580 self.send_ping(ping)?;
581 std::future::poll_fn(|cx| self.poll_pong(cx)).await
582 }
583
584 #[doc(hidden)]
585 pub fn send_ping(&mut self, ping: Ping) -> Result<(), crate::h2::Error> {
586 // Passing a `Ping` here is just to be forwards-compatible with
587 // eventually allowing choosing a ping payload. For now, we can
588 // just ignore it.
589 let _ = ping;
590
591 self.inner.send_ping().map_err(|err| match err {
592 Some(err) => err.into(),
593 None => UserError::SendPingWhilePending.into(),
594 })
595 }
596
597 #[doc(hidden)]
598 pub fn poll_pong(&mut self, cx: &mut Context) -> Poll<Result<Pong, crate::h2::Error>> {
599 ready!(self.inner.poll_pong(cx))?;
600 Poll::Ready(Ok(Pong { _p: () }))
601 }
602}
603
604impl fmt::Debug for PingPong {
605 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
606 fmt.debug_struct("PingPong").finish()
607 }
608}
609
610// ===== impl Ping =====
611
612impl Ping {
613 /// Creates a new opaque `Ping` to be sent via a [`PingPong`][].
614 ///
615 /// The payload is "opaque", such that it shouldn't be depended on.
616 ///
617 /// [`PingPong`]: struct.PingPong.html
618 pub fn opaque() -> Ping {
619 Ping { _p: () }
620 }
621}
622
623impl fmt::Debug for Ping {
624 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
625 fmt.debug_struct("Ping").finish()
626 }
627}
628
629// ===== impl Pong =====
630
631impl fmt::Debug for Pong {
632 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
633 fmt.debug_struct("Pong").finish()
634 }
635}