ant_quic/quinn_high_level/send_stream.rs
1use std::{
2 future::{Future, poll_fn},
3 io,
4 pin::{Pin, pin},
5 task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use crate::{ClosedStream, ConnectionError, FinishError, StreamId, Written};
10use thiserror::Error;
11
12use super::connection::{ConnectionRef, State};
13use crate::VarInt;
14
15/// A stream that can only be used to send data
16///
17/// If dropped, streams that haven't been explicitly [`reset()`] will be implicitly [`finish()`]ed,
18/// continuing to (re)transmit previously written data until it has been fully acknowledged or the
19/// connection is closed.
20///
21/// # Cancellation
22///
23/// A `write` method is said to be *cancel-safe* when dropping its future before the future becomes
24/// ready will always result in no data being written to the stream. This is true of methods which
25/// succeed immediately when any progress is made, and is not true of methods which might need to
26/// perform multiple writes internally before succeeding. Each `write` method documents whether it is
27/// cancel-safe.
28///
29/// [`reset()`]: SendStream::reset
30/// [`finish()`]: SendStream::finish
31#[derive(Debug)]
32pub struct SendStream {
33 conn: ConnectionRef,
34 stream: StreamId,
35 is_0rtt: bool,
36}
37
38impl SendStream {
39 pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
40 Self {
41 conn,
42 stream,
43 is_0rtt,
44 }
45 }
46
47 /// Write bytes to the stream
48 ///
49 /// Yields the number of bytes written on success. Congestion and flow control may cause this to
50 /// be shorter than `buf.len()`, indicating that only a prefix of `buf` was written.
51 ///
52 /// This operation is cancel-safe.
53 pub async fn write(&mut self, buf: &[u8]) -> Result<usize, WriteError> {
54 poll_fn(|cx| self.execute_poll(cx, |s| s.write(buf))).await
55 }
56
57 /// Convenience method to write an entire buffer to the stream
58 ///
59 /// This operation is *not* cancel-safe.
60 pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), WriteError> {
61 while !buf.is_empty() {
62 let written = self.write(buf).await?;
63 buf = &buf[written..];
64 }
65 Ok(())
66 }
67
68 /// Write chunks to the stream
69 ///
70 /// Yields the number of bytes and chunks written on success.
71 /// Congestion and flow control may cause this to be shorter than `buf.len()`,
72 /// indicating that only a prefix of `bufs` was written
73 ///
74 /// This operation is cancel-safe.
75 pub async fn write_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Written, WriteError> {
76 poll_fn(|cx| self.execute_poll(cx, |s| s.write_chunks(bufs))).await
77 }
78
79 /// Convenience method to write a single chunk in its entirety to the stream
80 ///
81 /// This operation is *not* cancel-safe.
82 pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WriteError> {
83 self.write_all_chunks(&mut [buf]).await?;
84 Ok(())
85 }
86
87 /// Convenience method to write an entire list of chunks to the stream
88 ///
89 /// This operation is *not* cancel-safe.
90 pub async fn write_all_chunks(&mut self, mut bufs: &mut [Bytes]) -> Result<(), WriteError> {
91 while !bufs.is_empty() {
92 let written = self.write_chunks(bufs).await?;
93 bufs = &mut bufs[written.chunks..];
94 }
95 Ok(())
96 }
97
98 fn execute_poll<F, R>(&mut self, cx: &mut Context, write_fn: F) -> Poll<Result<R, WriteError>>
99 where
100 F: FnOnce(&mut crate::SendStream) -> Result<R, crate::WriteError>,
101 {
102 use crate::WriteError::*;
103 let mut conn = self.conn.state.lock("SendStream::poll_write");
104 if self.is_0rtt {
105 conn.check_0rtt()
106 .map_err(|()| WriteError::ZeroRttRejected)?;
107 }
108 if let Some(ref x) = conn.error {
109 return Poll::Ready(Err(WriteError::ConnectionLost(x.clone())));
110 }
111
112 let result = match write_fn(&mut conn.inner.send_stream(self.stream)) {
113 Ok(result) => result,
114 Err(Blocked) => {
115 conn.blocked_writers.insert(self.stream, cx.waker().clone());
116 return Poll::Pending;
117 }
118 Err(Stopped(error_code)) => {
119 return Poll::Ready(Err(WriteError::Stopped(error_code)));
120 }
121 Err(ClosedStream) => {
122 return Poll::Ready(Err(WriteError::ClosedStream));
123 }
124 Err(ConnectionClosed) => {
125 return Poll::Ready(Err(WriteError::ClosedStream));
126 }
127 };
128
129 conn.wake();
130 Poll::Ready(Ok(result))
131 }
132
133 /// Notify the peer that no more data will ever be written to this stream
134 ///
135 /// It is an error to write to a [`SendStream`] after `finish()`ing it. [`reset()`](Self::reset)
136 /// may still be called after `finish` to abandon transmission of any stream data that might
137 /// still be buffered.
138 ///
139 /// To wait for the peer to receive all buffered stream data, see [`stopped()`](Self::stopped).
140 ///
141 /// May fail if [`finish()`](Self::finish) or [`reset()`](Self::reset) was previously
142 /// called. This error is harmless and serves only to indicate that the caller may have
143 /// incorrect assumptions about the stream's state.
144 pub fn finish(&mut self) -> Result<(), ClosedStream> {
145 let mut conn = self.conn.state.lock("finish");
146 match conn.inner.send_stream(self.stream).finish() {
147 Ok(()) => {
148 conn.wake();
149 Ok(())
150 }
151 Err(FinishError::ClosedStream) => Err(ClosedStream::default()),
152 // Harmless. If the application needs to know about stopped streams at this point, it
153 // should call `stopped`.
154 Err(FinishError::Stopped(_)) => Ok(()),
155 Err(FinishError::ConnectionClosed) => Err(ClosedStream::default()),
156 }
157 }
158
159 /// Close the send stream immediately.
160 ///
161 /// No new data can be written after calling this method. Locally buffered data is dropped, and
162 /// previously transmitted data will no longer be retransmitted if lost. If an attempt has
163 /// already been made to finish the stream, the peer may still receive all written data.
164 ///
165 /// May fail if [`finish()`](Self::finish) or [`reset()`](Self::reset) was previously
166 /// called. This error is harmless and serves only to indicate that the caller may have
167 /// incorrect assumptions about the stream's state.
168 pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
169 let mut conn = self.conn.state.lock("SendStream::reset");
170 if self.is_0rtt && conn.check_0rtt().is_err() {
171 return Ok(());
172 }
173 conn.inner.send_stream(self.stream).reset(error_code)?;
174 conn.wake();
175 Ok(())
176 }
177
178 /// Set the priority of the send stream
179 ///
180 /// Every send stream has an initial priority of 0. Locally buffered data from streams with
181 /// higher priority will be transmitted before data from streams with lower priority. Changing
182 /// the priority of a stream with pending data may only take effect after that data has been
183 /// transmitted. Using many different priority levels per connection may have a negative
184 /// impact on performance.
185 pub fn set_priority(&self, priority: i32) -> Result<(), ClosedStream> {
186 let mut conn = self.conn.state.lock("SendStream::set_priority");
187 conn.inner.send_stream(self.stream).set_priority(priority)?;
188 Ok(())
189 }
190
191 /// Get the priority of the send stream
192 pub fn priority(&self) -> Result<i32, ClosedStream> {
193 let mut conn = self.conn.state.lock("SendStream::priority");
194 conn.inner.send_stream(self.stream).priority()
195 }
196
197 /// Completes when the peer stops the stream or reads the stream to completion
198 ///
199 /// Yields `Some` with the stop error code if the peer stops the stream. Yields `None` if the
200 /// local side [`finish()`](Self::finish)es the stream and then the peer acknowledges receipt
201 /// of all stream data (although not necessarily the processing of it), after which the peer
202 /// closing the stream is no longer meaningful.
203 ///
204 /// For a variety of reasons, the peer may not send acknowledgements immediately upon receiving
205 /// data. As such, relying on `stopped` to know when the peer has read a stream to completion
206 /// may introduce more latency than using an application-level response of some sort.
207 pub fn stopped(
208 &self,
209 ) -> impl Future<Output = Result<Option<VarInt>, StoppedError>> + Send + Sync + 'static {
210 let conn = self.conn.clone();
211 let stream = self.stream;
212 let is_0rtt = self.is_0rtt;
213 async move {
214 loop {
215 // The `Notify::notified` future needs to be created while the lock is being held,
216 // otherwise a wakeup could be missed if triggered inbetween releasing the lock
217 // and creating the future.
218 // The lock may only be held in a block without `await`s, otherwise the future
219 // becomes `!Send`. `Notify::notified` is lifetime-bound to `Notify`, therefore
220 // we need to declare `notify` outside of the block, and initialize it inside.
221 let notify;
222 {
223 let mut conn = conn.state.lock("SendStream::stopped");
224 if let Some(output) = send_stream_stopped(&mut conn, stream, is_0rtt) {
225 return output;
226 }
227
228 notify = conn.stopped.entry(stream).or_default().clone();
229 notify.notified()
230 }
231 .await
232 }
233 }
234 }
235
236 /// Get the identity of this stream
237 pub fn id(&self) -> StreamId {
238 self.stream
239 }
240
241 /// Attempt to write bytes from buf into the stream.
242 ///
243 /// On success, returns Poll::Ready(Ok(num_bytes_written)).
244 ///
245 /// If the stream is not ready for writing, the method returns Poll::Pending and arranges
246 /// for the current task (via cx.waker().wake_by_ref()) to receive a notification when the
247 /// stream becomes writable or is closed.
248 pub fn poll_write(
249 self: Pin<&mut Self>,
250 cx: &mut Context,
251 buf: &[u8],
252 ) -> Poll<Result<usize, WriteError>> {
253 pin!(self.get_mut().write(buf)).as_mut().poll(cx)
254 }
255}
256
257/// Check if a send stream is stopped.
258///
259/// Returns `Some` if the stream is stopped or the connection is closed.
260/// Returns `None` if the stream is not stopped.
261fn send_stream_stopped(
262 conn: &mut State,
263 stream: StreamId,
264 is_0rtt: bool,
265) -> Option<Result<Option<VarInt>, StoppedError>> {
266 if is_0rtt && conn.check_0rtt().is_err() {
267 return Some(Err(StoppedError::ZeroRttRejected));
268 }
269 match conn.inner.send_stream(stream).stopped() {
270 Err(ClosedStream { .. }) => Some(Ok(None)),
271 Ok(Some(error_code)) => Some(Ok(Some(error_code))),
272 Ok(None) => conn.error.clone().map(|error| Err(error.into())),
273 }
274}
275
276/* TODO: Enable when futures-io feature is added
277#[cfg(feature = "futures-io")]
278impl futures_io::AsyncWrite for SendStream {
279 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
280 self.poll_write(cx, buf).map_err(Into::into)
281 }
282
283 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
284 Poll::Ready(Ok(()))
285 }
286
287 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
288 Poll::Ready(self.get_mut().finish().map_err(Into::into))
289 }
290}
291*/
292
293impl tokio::io::AsyncWrite for SendStream {
294 fn poll_write(
295 self: Pin<&mut Self>,
296 cx: &mut Context<'_>,
297 buf: &[u8],
298 ) -> Poll<io::Result<usize>> {
299 self.poll_write(cx, buf).map_err(Into::into)
300 }
301
302 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
303 Poll::Ready(Ok(()))
304 }
305
306 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
307 Poll::Ready(self.get_mut().finish().map_err(Into::into))
308 }
309}
310
311impl Drop for SendStream {
312 fn drop(&mut self) {
313 let mut conn = self.conn.state.lock("SendStream::drop");
314
315 // clean up any previously registered wakers
316 conn.blocked_writers.remove(&self.stream);
317
318 if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
319 return;
320 }
321 match conn.inner.send_stream(self.stream).finish() {
322 Ok(()) => conn.wake(),
323 Err(FinishError::Stopped(reason)) => {
324 if conn.inner.send_stream(self.stream).reset(reason).is_ok() {
325 conn.wake();
326 }
327 }
328 // Already finished or reset, which is fine.
329 Err(FinishError::ClosedStream) => {}
330 Err(FinishError::ConnectionClosed) => {}
331 }
332 }
333}
334
335/// Errors that arise from writing to a stream
336#[derive(Debug, Error, Clone, PartialEq, Eq)]
337pub enum WriteError {
338 /// The peer is no longer accepting data on this stream
339 ///
340 /// Carries an application-defined error code.
341 #[error("sending stopped by peer: error {0}")]
342 Stopped(VarInt),
343 /// The connection was lost
344 #[error("connection lost")]
345 ConnectionLost(#[from] ConnectionError),
346 /// The stream has already been finished or reset
347 #[error("closed stream")]
348 ClosedStream,
349 /// This was a 0-RTT stream and the server rejected it
350 ///
351 /// Can only occur on clients for 0-RTT streams, which can be opened using
352 /// [`Connecting::into_0rtt()`].
353 ///
354 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
355 #[error("0-RTT rejected")]
356 ZeroRttRejected,
357}
358
359impl From<ClosedStream> for WriteError {
360 #[inline]
361 fn from(_: ClosedStream) -> Self {
362 Self::ClosedStream
363 }
364}
365
366impl From<StoppedError> for WriteError {
367 fn from(x: StoppedError) -> Self {
368 match x {
369 StoppedError::ConnectionLost(e) => Self::ConnectionLost(e),
370 StoppedError::ZeroRttRejected => Self::ZeroRttRejected,
371 }
372 }
373}
374
375impl From<WriteError> for io::Error {
376 fn from(x: WriteError) -> Self {
377 use WriteError::*;
378 let kind = match x {
379 Stopped(_) | ZeroRttRejected => io::ErrorKind::ConnectionReset,
380 ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
381 };
382 Self::new(kind, x)
383 }
384}
385
386/// Errors that arise while monitoring for a send stream stop from the peer
387#[derive(Debug, Error, Clone, PartialEq, Eq)]
388pub enum StoppedError {
389 /// The connection was lost
390 #[error("connection lost")]
391 ConnectionLost(#[from] ConnectionError),
392 /// This was a 0-RTT stream and the server rejected it
393 ///
394 /// Can only occur on clients for 0-RTT streams, which can be opened using
395 /// [`Connecting::into_0rtt()`].
396 ///
397 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
398 #[error("0-RTT rejected")]
399 ZeroRttRejected,
400}
401
402impl From<StoppedError> for io::Error {
403 fn from(x: StoppedError) -> Self {
404 use StoppedError::*;
405 let kind = match x {
406 ZeroRttRejected => io::ErrorKind::ConnectionReset,
407 ConnectionLost(_) => io::ErrorKind::NotConnected,
408 };
409 Self::new(kind, x)
410 }
411}