1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
mod chunked;
mod fixed;

use crate::request::Metadata;
use futures_io::AsyncWrite;
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};

/// An in-progress HTTP request which is currently sending a request body.
///
/// The `'socket` lifetime parameter is the lifetime of the transport socket. The `Socket` type
/// parameter is the type of transport-layer socket over which the HTTP request will be sent.
#[derive(Debug, Eq, PartialEq)]
enum Mode<'socket, Socket: AsyncWrite + ?Sized> {
	/// The request body has a fixed size given by a `content-length` header or by the inherent
	/// properties of the request (e.g. a `HEAD` request always having a zero-length body).
	Fixed(fixed::Send<'socket, Socket>),

	/// The request body is encoded using chunked transfer encoding.
	Chunked(chunked::Send<'socket, Socket>),
}

/// An in-progress HTTP request which is currently sending a request body.
///
/// After the headers are sent, an instance of this type is obtained. It implements
/// [`AsyncWrite`](futures_io::AsyncWrite), which allows the application to write the request body.
/// When the request body is finished (or immediately, if there is no request body to send),
/// `finish` should be called to proceed to the next step.
///
/// The `'socket` lifetime parameter is the lifetime of the transport socket. The `Socket` type
/// parameter is the type of transport-layer socket over which the HTTP request will be sent.
#[derive(Debug, Eq, PartialEq)]
pub struct Send<'socket, Socket: AsyncWrite + ?Sized> {
	/// The underlying representation.
	inner: Mode<'socket, Socket>,

	/// The request metadata.
	metadata: Metadata,
}

impl<'socket, Socket: AsyncWrite + ?Sized> Send<'socket, Socket> {
	/// Constructs a new `Send` to send a request body of a fixed length.
	///
	/// The `socket` parameter is the transport-layer socket over which the HTTP request will be
	/// sent, and over which the request headers must already have been sent. The `head` parameter
	/// indicates whether the request method was `HEAD`. The `connection_close` parameter indicates
	/// whether the request headers contained `connection: close`. The `length` parameter is the
	/// length of the body to send.
	pub(crate) fn new_fixed(
		socket: Pin<&'socket mut Socket>,
		metadata: Metadata,
		length: u64,
	) -> Self {
		Self {
			inner: Mode::Fixed(fixed::Send::new(socket, length)),
			metadata,
		}
	}

	/// Constructs a new `Send` to send a request body using chunked encoding.
	///
	/// The `socket` parameter is the transport-layer socket over which the HTTP request will be
	/// sent, and over which the request headers must already have been sent. The `head` parameter
	/// indicates whether the request method was `HEAD`. The `connection_close` parameter indicates
	/// whether the request headers contained `connection: close`.
	pub(crate) fn new_chunked(socket: Pin<&'socket mut Socket>, metadata: Metadata) -> Self {
		Self {
			inner: Mode::Chunked(chunked::Send::new(socket)),
			metadata,
		}
	}

	/// Gives a hint about how many bytes of body remain to be sent.
	///
	/// The `length` parameter is the minimum number of bytes of body that will be sent from this
	/// point forward.
	///
	/// The application must send at least `length` bytes before finishing the request. However, it
	/// is permitted to send *more* than `length` bytes (subject to any other constraints, such as
	/// not sending more than specified in the `content-length` header).
	///
	/// For a fixed-length request (e.g. one whose length was set by a `content-length` header),
	/// this function does nothing.
	///
	/// For a chunked request, this function sets the size of the next chunk; this allows a large
	/// chunk size to be sent without the entire contents of the chunk needing to be available at
	/// once. For example, if the application knows that the body will contain *at least* another
	/// 100,000 bytes, it could call this function passing a `length` parameter of 100,000, but
	/// then write just 10,000 bytes at a time, ten times over. Without calling this function, that
	/// would result in ten 10,000-byte chunks being sent; by calling this function, a single
	/// 100,000-byte chunk is sent instead, reducing the overhead due to chunk headers, without
	/// requiring that the application load all 100,000 bytes into memory at once.
	pub fn hint_length(&mut self, length: u64) {
		match &mut self.inner {
			Mode::Fixed(_) => (),
			Mode::Chunked(inner) => inner.hint_length(length),
		}
	}

	/// Finishes the request.
	///
	/// On success, a metadata instance is returned. This instance must be passed in when the
	/// application is ready to receive the response headers.
	///
	/// *Important*: This function does not flush the socket. If the application is going to read a
	/// response, or if the socket is a buffered wrapper around an underlying socket and the
	/// application intends to unwrap the wrapper, it must flush the socket before proceeding to
	/// read the response, otherwise a hang might occur due to the server waiting for the remainder
	/// of the request which will never arrive. If the application intends to send another request
	/// instead, using HTTP pipelining, the flush is not necessary.
	///
	/// # Errors
	/// This function returns an error if writing to the underlying socket fails.
	///
	/// # Panics
	/// This function panics in a debug build:
	/// * if the body has a fixed length and was not fully sent
	/// * if the body is chunked and the most recent chunk was not fully sent
	///
	/// These are debug-build panics, not errors, because it is expected that the application will
	/// provide a request body consistent with the request headers; since both of these things are
	/// under the application’s control, an inconsistency between them represents a bug in the
	/// application.
	pub async fn finish(self) -> Result<Metadata> {
		match self.inner {
			Mode::Fixed(inner) => inner.finish(),
			Mode::Chunked(inner) => inner.finish().await?,
		}
		Ok(self.metadata)
	}
}

impl<Socket: AsyncWrite + ?Sized> AsyncWrite for Send<'_, Socket> {
	fn poll_write(
		mut self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		buf: &[u8],
	) -> Poll<Result<usize>> {
		match self.inner {
			Mode::Fixed(ref mut inner) => Pin::new(inner).poll_write(cx, buf),
			Mode::Chunked(ref mut inner) => Pin::new(inner).poll_write(cx, buf),
		}
	}

	fn poll_write_vectored(
		mut self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		bufs: &[std::io::IoSlice<'_>],
	) -> Poll<Result<usize>> {
		match self.inner {
			Mode::Fixed(ref mut inner) => Pin::new(inner).poll_write_vectored(cx, bufs),
			Mode::Chunked(ref mut inner) => Pin::new(inner).poll_write_vectored(cx, bufs),
		}
	}

	fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
		match self.inner {
			Mode::Fixed(ref mut inner) => Pin::new(inner).poll_flush(cx),
			Mode::Chunked(ref mut inner) => Pin::new(inner).poll_flush(cx),
		}
	}

	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
		match self.inner {
			Mode::Fixed(ref mut inner) => Pin::new(inner).poll_close(cx),
			Mode::Chunked(ref mut inner) => Pin::new(inner).poll_close(cx),
		}
	}
}

#[cfg(test)]
mod test {
	use futures_io::AsyncWrite;
	use std::pin::Pin;

	pub trait AsyncWriteExt: AsyncWrite {
		fn write<'a>(self: Pin<&'a mut Self>, data: &'a [u8]) -> WriteFuture<'a, Self> {
			WriteFuture { sink: self, data }
		}
	}

	impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}

	/// A future that writes data to an `AsyncWrite`.
	#[derive(Debug)]
	pub struct WriteFuture<'a, T: AsyncWrite + ?Sized> {
		sink: Pin<&'a mut T>,
		data: &'a [u8],
	}

	impl<'a, T: AsyncWrite + ?Sized> std::future::Future for WriteFuture<'a, T> {
		type Output = std::io::Result<usize>;

		fn poll(
			mut self: Pin<&mut Self>,
			cx: &mut std::task::Context<'_>,
		) -> std::task::Poll<Self::Output> {
			let data = self.data;
			self.sink.as_mut().poll_write(cx, data)
		}
	}
}